## Spark Hadoop Mpp
Yifei Wang

In [1]:
import pyspark.sql
import pyspark
import spark
from pyspark.sql import SparkSession
import pprint  

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Step 1 – Load JSON Data

In [3]:
assessments_1 = spark.read.format('json').load("assessments.json")

Step 2 – Prey Print First JSON Object

In [4]:
pprint.pprint(assessments_1.show(1))



+--------------------+-------------+--------------------+-----------------+--------------------+-----------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|  keen_created_at|             keen_id|   keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+-----------------+--------------------+-----------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...|1516717442.735266|5a6745820eb8ab000...|1516717442.735266|         1.0|[1, [false, 2, 1,...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
+--------------------+-------------+--------------------+-----------------+--------------------+-----------------+------------+--------------------+--------------------+--------------------+
only showing top 1 row

None


Step 3 – Recursive Walk First JSON Object

In [5]:
row = assessments_1.first()
for key in row.asDict().keys():
    app = assessments_1.select(key);
    app.registerTempTable(key);
    app.printSchema();
    app.show(1,False);

root
 |-- base_exam_id: string (nullable = true)

+------------------------------------+
|base_exam_id                        |
+------------------------------------+
|37f0a30a-7464-11e6-aa92-a8667f27e5dc|
+------------------------------------+
only showing top 1 row

root
 |-- certification: string (nullable = true)

+-------------+
|certification|
+-------------+
|false        |
+-------------+
only showing top 1 row

root
 |-- exam_name: string (nullable = true)

+-------------------------------------------+
|exam_name                                  |
+-------------------------------------------+
|Normal Forms and All That Jazz Master Class|
+-------------------------------------------+
only showing top 1 row

root
 |-- keen_created_at: string (nullable = true)

+-----------------+
|keen_created_at  |
+-----------------+
|1516717442.735266|
+-----------------+
only showing top 1 row

root
 |-- keen_id: string (nullable = true)

+------------------------+
|keen_id                 |

+------------------------+
|started_at              |
+------------------------+
|2018-01-23T14:23:19.082Z|
+------------------------+
only showing top 1 row

root
 |-- user_exam_id: string (nullable = true)

+------------------------------------+
|user_exam_id                        |
+------------------------------------+
|6d4089e4-bde5-4a22-b65f-18bce9ab79c8|
+------------------------------------+
only showing top 1 row



Step 4 – Demonstrate MPP Spark Transform


In [6]:
assessments_2 = assessments_1.select('sequences').rdd.map(lambda x: (len(x), )).toDF()
assessments_2 = assessments_2.withColumnRenamed("_1","sequences_len")
assessments_2.show()

+-------------+
|sequences_len|
+-------------+
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
+-------------+
only showing top 20 rows



Step 5 – Spark SQL in Memory Query

In [7]:
assessments_2.registerTempTable('w205')

In [8]:
assessments_3 = spark.sql("select count(*) from w205")
assessments_3.show()

+--------+
|count(1)|
+--------+
|    3280|
+--------+



Step 6 – Write to Hadoop HDFS Parquet Columnar Table and Impose Schema on Read

In [9]:
assessments_1.write.parquet("assessments.parquet")

Step 7 – Spark SQL Query against Hadoop HDFS Parquet Columnar Table

In [10]:
q7 = spark.read.parquet('assessments.parquet')
q7.registerTempTable('q7')
spark.sql('select max(base_exam_id) from q7').show(1,False)

+------------------------------------+
|max(base_exam_id)                   |
+------------------------------------+
|ffc5c454-7460-11e6-bea1-a4d18ccf3cb4|
+------------------------------------+



Step 8 – MPP Read from Hadoop HDFS Parquet Columnar Table

In [11]:
assessments_4 = spark.read.parquet('assessments.parquet')

In [12]:
type(assessments_4)

pyspark.sql.dataframe.DataFrame