In [None]:
# Load the SageMaker Studio Analytics extension and connect to the EMR cluster
# The first line loads the custom magics for SageMaker Studio Analytics.
# The second line connects to the specified EMR cluster using the provided cluster ID
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --verify-certificate False --cluster-id j-BMCH4XQC5JW5 --auth-type None --language python  

INFO:root:Read [/opt/ml/metadata/resource-metadata.json] from disk at 1756611008870351350
INFO:root:Returning content of [/opt/ml/metadata/resource-metadata.json] updated at time 1756611008870351350
INFO:root:Read [/opt/.sagemakerinternal/internal-metadata.json] from disk at 1756610999302310693

pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.



Successfully read emr cluster(j-BMCH4XQC5JW5) details


INFO:sagemaker_studio_sparkmagic_lib.emr:Successfully read emr cluster(j-BMCH4XQC5JW5) details


Initiating EMR connection..
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1756611020725_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


INFO:278267777be742d59f36789f6c9282f2:{"LibraryVersion": "0.2.0", "Service": "emr", "Operation": "connect", "AccountId": "046078786454", "EventTimeStampMillis": 1756620878774.57, "Exception": null, "ExceptionString": null, "Error": 0, "Fault": 0, "Success": 1, "AuthType": "no-auth", "ConnectionProtocol": "http", "StackTrace": null, "OperationStartTimeMillis": 1756620835314.8145, "OperationEndTimeMillis": 1756620878774.562, "OperationDurationMillis": 43459.74755859375, "KernelName": "PySparkKernel", "ClusterId": "j-BMCH4XQC5JW5", "VerifyCertificate": "False"}


{"namespace": "sagemaker-analytics", "cluster_id": "j-BMCH4XQC5JW5", "error_message": null, "success": true, "service": "emr", "operation": "connect"}


## Exploring and Querying Data with SparkMagic PySpark
 
In this notebook, we play around with data using SparkMagic PySpark while connected to an Amazon EMR cluster.

### Checking Out the Session Info
 
Since we're using the PySpark kernel, we can just use the %%info magic to see what's up with our current session.

In [None]:
# Display current PySpark session information
# The %%info magic command shows details about the current Spark session, such as version, configuration, and connection status.
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1756611020725_0003,pyspark,idle,Link,Link,,✔


### Exploring and Querying the Data
 
So, the PySpark Kernel gives us a general SQLContext, but here we're using HiveContext, which is more about Hive tables and stuff. SQLContext is more general, but HiveContext is great for Hive features like UDFs and indexing. 
 
When we connect to the EMR cluster, SparkContext and HiveContext are set up for us. We use HiveContext to run SQL queries on Hive tables and get the results as Spark dataframes.

We use HiveContext to check out what databases and tables are available.

In [None]:
# Query Hive databases and tables using HiveContext
# Re-initialize sqlContext as a HiveContext to enable Hive-specific features.
sqlContext = HiveContext(sqlContext)
# List all databases in Hive.
dbs = sqlContext.sql("show databases")
dbs.show()
# List all tables in the current database.
tables = sqlContext.sql("show tables")
tables.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+
|databaseName|
+------------+
|     default|
+------------+

+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
| default|adult_data|      false|
+--------+----------+-----------+

We grab the data from the 'adult_data' table and load it into a Spark dataframe.

In [None]:
# Query the 'adult_data' Hive table and cache the result in a Spark dataframe.
# Caching improves performance for repeated operations on this dataframe.
adult_df = sqlContext.sql("select * from adult_data").cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We check out the shape of the dataframe and look at the first five rows to get a feel for the data.

In [None]:
# View the shape of the dataframe (number of rows, number of columns)
print((adult_df.count(), len(adult_df.columns)))
# Show the first 5 rows of the dataframe
adult_df.head(5)
# Print the schema of the dataframe to see column types and structure
adult_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1000, 15)
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_per_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)

To make the output easier to read, we convert the Spark dataframe to a Pandas dataframe.

In [None]:
# Convert the first 5 rows of the Spark dataframe to a Pandas dataframe for cleaner output.
adult_df.limit(5).toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   age   workclass  fnlwgt  ...  hours_per_week  native_country  income
0  age   workclass  fnlwgt  ...  hours_per_week  native_country  income
1   25     Private  226802  ...              40   United-States   <=50K
2   38     Private   89814  ...              50   United-States   <=50K
3   28   Local-gov  336951  ...              40   United-States    >50K
4   44     Private  160323  ...              40   United-States    >50K

[5 rows x 15 columns]

Some ML algorithms (like linear regression) need numeric features, but our dataset has a bunch of categorical columns like workclass, education, occupation, etc.

We use StringIndexer and OneHotEncoder to turn those categorical columns into numeric ones. StringIndexer gives us label indexes, and OneHotEncoder turns those into binary vectors (so each category gets its own column with 0s and 1s).
 
First, we use StringIndexer, then OneHotEncoder. You can check out the docs for [StringIndexer](https://spark.apache.org/docs/latest/ml-features.html#stringindexer) and [OneHotEncoder](https://spark.apache.org/docs/latest/ml-features.html#onehotencoder) if you want more details.

In [None]:
# Convert categorical variables to numeric using StringIndexer and OneHotEncoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# List of categorical columns to be encoded
categorical_variables = ['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country']
# Create StringIndexer objects for each categorical column
indexers = [StringIndexer(inputCol=column, outputCol=column+"_idx") for column in categorical_variables]
# Create OneHotEncoder objects for each indexed column
encoders = [OneHotEncoder(inputCol=col+"_idx", outputCol=col+"_ohe") for col in categorical_variables]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We use VectorAssembler to combine all those one-hot encoded columns into a single column that holds an array of values.

If you want to know more about VectorAssembler, check out the [docs](https://spark.apache.org/docs/latest/ml-features.html#vectorassembler).

In [None]:
# Use VectorAssembler to combine all one-hot encoded categorical features into a single vector column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=[col+"_ohe" for col in categorical_variables],
    outputCol="categorical-features"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Pipelines are just a way to chain together a bunch of transformers and estimators so we don't have to do each step by hand every time.

We set up a pipeline with all our steps, fit it to the data, and then use it to transform the dataset. If you want to read more about pipelines, check out the [docs](https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline).

In [None]:
# Build and apply a PySpark pipeline to automate the transformations
from pyspark.ml import Pipeline
# Define the pipeline with all indexers, encoders, and the assembler
pipeline = Pipeline(stages=indexers + encoders + [assembler])
# Fit the pipeline to the data to create a PipelineModel
pipelineModel = pipeline.fit(adult_df)
# Transform the dataset using the fitted pipeline model
adult_df = pipelineModel.transform(adult_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We check out all the new columns that get created after running the pipeline.

In [None]:
# Print the schema of the transformed dataframe to review new columns created by the pipeline
adult_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_per_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital_status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- sex-index: double (nullable = false)
 |-- native_country-index: double (nulla

After all the transformations, we end up with a single column that has an array with all the encoded categorical variables.

In [None]:
# Show the 'categorical-features' column, which contains the combined one-hot encoded features as a vector
adult_df.select('categorical-features').show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------+
|categorical-features                                           |
+---------------------------------------------------------------+
|(86,[79],[1.0])                                                |
|(86,[0,11,24,36,47,52,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[0,7,23,40,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[2,16,23,42,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[0,8,23,36,45,52,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[3,8,24,37,47,51,57,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[0,13,24,33,46,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[3,7,24,37,48,52,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[1,15,23,31,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[0,8,24,33,48,51,57,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[0,14,23,32,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[0,7,23,36,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[6,9,

Now we need to encode the target label too.

In [None]:
# Encode the target label ('income') as a numeric column called 'label' using StringIndexer
indexer = StringIndexer(inputCol='income', outputCol='label')
adult_df = indexer.fit(adult_df).transform(adult_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Checking Out the Spark UI
 
Here we use the Spark UI to see how our jobs are running and check out performance details.

We grab the current Spark session info again just to make sure everything is still working.

In [None]:
# Display current Spark session information again to monitor the session after transformations
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1756611020725_0003,pyspark,idle,Link,Link,,✔


We find links for the Spark UI and Driver log (though the Driver log link doesn't work for us). The Spark UI link is generated when we connect to the EMR cluster. We use it to check out our Spark jobs in a browser and see how things are running.

Some cool stuff in the Spark server:
- The Jobs tab shows all the Spark jobs and their status.
- The Event Timeline gives a visual of the different stages.
- Completed Jobs are listed in a table, and you can click on them for more details.
- The DAG Visualization lets you see the tasks and dig into the details for each stage.

### Cleanup
 
That's it for this notebook! To move on, just close this file and head back to the lab session for the conclusion.