## Task 6: Explore and query data from the SparkMagic PySpark kernel

In this notebook, you explore and query the data using SparkMagic PySpark while connected to an Amazon EMR cluster.

In [2]:
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --verify-certificate False --cluster-id j-HSSHJ1JAGZEO --auth-type None --language python  

Successfully read emr cluster(j-HSSHJ1JAGZEO) details
Initiating EMR connection..
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1722887622485_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
{"namespace": "sagemaker-analytics", "cluster_id": "j-HSSHJ1JAGZEO", "error_message": null, "success": true, "service": "emr", "operation": "connect"}


### Task 6.1: Display session information

Because you are using the PySpark kernel, you can use the PySpark magic %%info to display the current session information.

In [None]:
%%info

### Task 6.2: Explore and query the data

 Please note that while the PySpark Kernel already adds a general SQLContext, we are using a subset of SQLContext called HiveContext. SQLContext is a more general-purpose interface for working with structured data, while HiveContext is specific to Hive and its ecosystem. SQLContext supports a wider range of data sources, while HiveContext is focused on Hive tables and Metastores and it provides additional features like support for Hive UDFs, Hive indexing, and Hive statistics. 

 When using the PySpark kernel, a SparkContext and a HiveContext are created automatically after connecting to an EMR cluster. You can use HiveContext to query data in the Hive table and make it available in a Spark dataframe.

Use the HiveContext to query Hive and observe the databases and tables.

In [3]:
#query-hive

sqlContext = HiveContext(sqlContext)

dbs = sqlContext.sql("show databases")
dbs.show()

tables = sqlContext.sql("show tables")
tables.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+
|databaseName|
+------------+
|     default|
+------------+

+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
| default|adult_data|      false|
+--------+----------+-----------+

Query the adult data table and get the data into a Spark dataframe.

In [4]:
#load-data

adult_df = sqlContext.sql("select * from adult_data").cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Use the dataframe to look at the shape and view the first five rows of the dataset.

In [5]:
#view-shape

print((adult_df.count(), len(adult_df.columns)))

#Show first 5 rows 
adult_df.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1000, 15)
[Row(age='age', workclass='workclass', fnlwgt='fnlwgt', education='education', education_num='education_num', marital_status='marital_status', occupation='occupation', relationship='relationship', race='race', sex='sex', capital_gain='capital_gain', capital_loss='capital_loss', hours_per_week='hours_per_week', native_country='native_country', income='income'), Row(age='25', workclass=' Private', fnlwgt='226802', education=' 11th', education_num='7', marital_status=' Never-married', occupation=' Machine-op-inspct', relationship=' Own-child', race=' Black', sex=' Male', capital_gain='0', capital_loss='0', hours_per_week='40', native_country=' United-States', income=' <=50K'), Row(age='38', workclass=' Private', fnlwgt='89814', education=' HS-grad', education_num='9', marital_status=' Married-civ-spouse', occupation=' Farming-fishing', relationship=' Husband', race=' White', sex=' Male', capital_gain='0', capital_loss='0', hours_per_week='50', native_country=' United-States', i

For a cleaner output, convert the Spark dataframe into a Pandas dataframe.

In [6]:
#convert-dataframe

adult_df.limit(5).toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   age   workclass  fnlwgt  ...  hours_per_week  native_country  income
0  age   workclass  fnlwgt  ...  hours_per_week  native_country  income
1   25     Private  226802  ...              40   United-States   <=50K
2   38     Private   89814  ...              50   United-States   <=50K
3   28   Local-gov  336951  ...              40   United-States    >50K
4   44     Private  160323  ...              40   United-States    >50K

[5 rows x 15 columns]

Some machine learning (ML) algorithms, such as linear regression, require numeric features. The adult dataset that you use in this lab includes categorical features such as **workclass**, **education**, **occupation**, **marital status**, **relationship**, **race**, and **sex**.

The following code block illustrates how to use StringIndexer and OneHotEncoderEstimator to convert categorical variables into a set of numeric variables that take on values of 0 and 1.

- StringIndexer converts a column of string values to a column of label indexes. 
- OneHotEncoderEstimator maps a column of category indices to a column of binary vectors, with at most one "1" in each row that indicates the category index for that row.

One-hot encoding in Spark is a two-step process. You first use the **StringIndexer**, followed by the **OneHotEncoder**.

Refer to [StringIndexer](https://spark.apache.org/docs/latest/ml-features.html#stringindexer) and [OneHotEncoder](https://spark.apache.org/docs/latest/ml-features.html#onehotencoder) for more information.

In [17]:
#convert-variables

from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

categorical_variables = ['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country']

indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in categorical_variables]

encoder = OneHotEncoderEstimator(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The VectorAssembler class takes multiple columns as the input. It outputs a single column that contains an array of values.

Refer to [VectorAssembler](https://spark.apache.org/docs/latest/ml-features.html#vectorassembler) for more information about this assembler.

In [16]:
#vector-assembler

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(),
    outputCol="categorical-features"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

A pipeline is an ordered list of transformers and estimators. You can define a pipeline to automate and ensure repeatability of the transformations to be applied to a dataset. In this step, you define the pipeline and then apply it to the dataset.

Similar to **StringIndexer**, a pipeline is an **estimator**. The pipeline.fit() method returns a **PipelineModel**, which is a **transformer**.

Refer to [Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline) for more information about the machine learning pipelines.

In [9]:
#pyspark-pipelines

from pyspark.ml import Pipeline

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=indexers + [encoder, assembler])

# Define the pipeline model.
pipelineModel = pipeline.fit(adult_df)

# Apply the pipeline model to the dataset.
adult_df = pipelineModel.transform(adult_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Review all the different columns that were created in the previous step.

In [10]:
#print-schema

adult_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_per_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital_status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- sex-index: double (nullable = false)
 |-- native_country-index: double (nulla

After applying the transformations, a single column contains an array with every encoded categorical variable.

In [11]:
#view-categorical-features

adult_df.select('categorical-features').show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------+
|categorical-features                                           |
+---------------------------------------------------------------+
|(86,[79],[1.0])                                                |
|(86,[0,11,24,36,47,52,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[0,7,23,40,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[2,16,23,42,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[0,8,23,36,45,52,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[3,8,24,37,47,51,57,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[0,13,24,33,46,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[3,7,24,37,48,52,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[1,15,23,31,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[0,8,24,33,48,51,57,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[0,14,23,32,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(86,[0,7,23,36,45,51,56,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(86,[6,9,

Now, encode the target label.

In [None]:
#encode-target

indexer = StringIndexer(inputCol='income', outputCol='label')

adult_df = indexer.fit(adult_df).transform(adult_df)

### Task 6.3: Monitor and debug with the Spark UI

In this section, you use the Spark UI to monitor and inspect the performance of Spark jobs that you ran in the previous steps.

Get the current spark session information.

In [18]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1722887622485_0003,pyspark,idle,Link,Link,,✔


You can find hyperlinks for the **Spark UI** and **Driver log**. In this lab, the **Driver log** link is inactive. The **Spark UI** presigned URL is generated at the time of connection to the EMR cluster. Choosing this link takes you to the Spark UI to inspect Spark job runs in a web browser. These metrics are helpful for performance tuning. 

Here are some of the important features to look for in the Spark server:
- The **Jobs** tab shows the status of all the Spark jobs in this Spark application.
- Under the summary section, the **Event Timeline** section shows the various stages of the run.
- The **Completed Jobs** section is shown in a tabular format. Under the **Completed Jobs** section, you can choose a job to review information about the stages of tasks inside it.
- Using the **DAG Visualization**, you can explore the tasks that were run earlier. As with the **Event Timeline** view, using the **DAG visualization**, you can choose a stage and expand details within the stage.

### Cleanup

You have completed this notebook. To move to the next part of the lab, do the following:

- Close this notebook file.
- Return to the lab session and continue with the **Conclusion**.