# Welcome to PySpark and Amazon Elastic Map-Reduce (EMR)

*** 

Preliminaries

*** 

In [9]:
# PySpark Specific Machinery
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

import pyspark.sql.functions as F

The SparkSession object represents your entrypoint to the cluster. It is the single most important object in Spark and serves as the mechanism through which you will interact with the Hadoop resources provisioned for you. Every Spark "job" - i.e. an individual spark applicaton run by a user, will have a uniqe SparkContext that in some abstract sense 'contains' the resources allocated to that job at all times. For that reason each job only has one SparkContext - if you try and create a second one, you'll get an error (depending on cluster configuration parameters).

In [2]:
spark = SparkSession.builder\
    .master("yarn")\
    .config("spark.driver.cores", 1)\
    .appName("demo_spark")\
    .getOrCreate()

In [3]:
spark

You can exit the SparkSession using the following switch, however if you need the resources back you might need to wait for your cluster availability to swing back in your favor (so be careful especially if you suspect your co-workers overuse cluster resources)

In [4]:
# spark.stop() # shuts down the sparkcontext - but not the notebook.

# The Resilient Distributed Dataset

RDDs are the core object of study in Spark and form the backbone of the architecture that makes it work. An RDD can be thought of as a complex plan for creating a dataset in parallel - a dataset that will in the end, exist on disk in many pieces. Since it starts in many pieces, the RDD represents the set of partition level plans to build the dataset from scratch in parallel. When thought of in this way it makes sense that if something happens and a single node fails, Spark doesn't have to regenerate the entire dataset - it can just regenerate the specific partitions related to the node that failed. This is very in-pattern when considered as an application of the Hadoop infrastructure.

In [19]:
# Figure needed

In [21]:
print(data.rdd.toDebugString())

b'(13) MapPartitionsRDD[13] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |   MapPartitionsRDD[12] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |   MapPartitionsRDD[11] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |   FileScanRDD[10] at javaToPython at NativeMethodAccessorImpl.java:0 []'


# Cluster Tuning Options

# Parallel I/O 

# Basic Dataset Operations

# DAGs and Lazy Execution


Lazy execution is a feature built into spark designed both to make development easier for you as well as minimize overuse of cluster resources. In one sentence, it means that Spark doesn't compute anything it doesn't absolutely have to. In Spark, you write the majority of the program in a manner that specifies a PLAN of WHAT youw WANT to happen - and then trigger the actual execution of the plan later. Let's see how this actually works in practice

In [5]:
data = spark.read.parquet('data/subset_reviews_parquet/')

In [6]:
data.explain()

== Physical Plan ==
*(1) FileScan parquet [Id#0,ProductId#1,UserId#2,ProfileName#3,HelpfulnessNumerator#4,HelpfulnessDenominator#5,Score#6,Time#7,Summary#8,Text#9] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-32-61.ec2.internal:8020/user/hadoop/data/subset_reviews_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Id:string,ProductId:string,UserId:string,ProfileName:string,HelpfulnessNumerator:string,He...


In [13]:
best_scores = data.where(F.col('Score') == '5')

In [14]:
best_scores

DataFrame[Id: string, ProductId: string, UserId: string, ProfileName: string, HelpfulnessNumerator: string, HelpfulnessDenominator: string, Score: string, Time: string, Summary: string, Text: string]

In [15]:
best_scores.explain()

== Physical Plan ==
*(1) Project [Id#0, ProductId#1, UserId#2, ProfileName#3, HelpfulnessNumerator#4, HelpfulnessDenominator#5, Score#6, Time#7, Summary#8, Text#9]
+- *(1) Filter (isnotnull(Score#6) && (Score#6 = 5))
   +- *(1) FileScan parquet [Id#0,ProductId#1,UserId#2,ProfileName#3,HelpfulnessNumerator#4,HelpfulnessDenominator#5,Score#6,Time#7,Summary#8,Text#9] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-32-61.ec2.internal:8020/user/hadoop/data/subset_reviews_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(Score), EqualTo(Score,5)], ReadSchema: struct<Id:string,ProductId:string,UserId:string,ProfileName:string,HelpfulnessNumerator:string,He...


In [17]:
best_scores.select('Score', 'Text').show()

+-----+--------------------+
|Score|                Text|
+-----+--------------------+
|    5|These chips were ...|
|    5|these are the bes...|
|    5|"At first I was t...|
|    5|I try to be good,...|
|    5|We've long enjoye...|
|    5|"Kettle lightly s...|
|    5|Fantastic tasting...|
|    5|Great for HS lunc...|
|    5|My 8 yo is a Kett...|
|    5|This may sound re...|
|    5|I bought my first...|
|    5|A Ming Dynasty (A...|
|    5|I enjoy a number ...|
|    5|I have an okra lo...|
|    5|"My just turned 4...|
|    5|"My just turned 4...|
|    5|My 18 month old l...|
|    5|My kids love thes...|
|    5|These organic pou...|
|    5|We absolutely lov...|
+-----+--------------------+
only showing top 20 rows



Long story short, doing something to a datset via some code in this notebook doesn't actually do anything to materialize the real data until you force it too - you can think of successive dataframe manipulations as simply adding steps to the plan for what will later happen. This is advantageous for the programmer as well as the cluster - you don't want to have to compute the entire dataset every time you want to change something. You can make it pretty far without actually materializing anything however - generally when developing data science workflows you make many errors just trying to match up the right metadata anyways.

In [9]:
data = spark.read.csv('data/Subset_reviews.csv', header='true')

In [11]:
data.show()

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|                Karl|            

In [12]:
data.printSchema()

root
 |-- Id: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- HelpfulnessNumerator: string (nullable = true)
 |-- HelpfulnessDenominator: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Text: string (nullable = true)



In [13]:
data.write.parquet('data/subset_reviews_parquet/')

In [15]:
data.groupBy('Score').count().show()

+--------------+-----+
|         Score|count|
+--------------+-----+
|       Hugs"""|    1|
|             7|    7|
|     friend"""|    1|
|            11|    1|
|             3| 8019|
|             8|    1|
| and Kitten"""|   26|
|        Dad"""|    1|
|            16|    1|
|             0|  206|
|            47|    5|
|   Comp sci"""|    1|
|          null|    2|
|             5|62148|
|            18|    1|
|       blo..."|    1|
|     Medit..."|    1|
|            17|    1|
|     Lyme ..."|    1|
|          ..."|    7|
+--------------+-----+
only showing top 20 rows



In [16]:
data = spark.read.parquet('data/subset_reviews_parquet')

In [18]:
data.printSchema()

root
 |-- Id: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- HelpfulnessNumerator: string (nullable = true)
 |-- HelpfulnessDenominator: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Text: string (nullable = true)



In [19]:
data.explain()

== Physical Plan ==
*(1) FileScan parquet [Id#143,ProductId#144,UserId#145,ProfileName#146,HelpfulnessNumerator#147,HelpfulnessDenominator#148,Score#149,Time#150,Summary#151,Text#152] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-32-61.ec2.internal:8020/user/hadoop/data/subset_reviews_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Id:string,ProductId:string,UserId:string,ProfileName:string,HelpfulnessNumerator:string,He...


In [24]:
data.groupBy('Score').count().sort(F.col('count').desc()).show()

+------------------+-----+
|             Score|count|
+------------------+-----+
|                 5|62148|
|                 4|14537|
|                 1| 9392|
|                 3| 8019|
|                 2| 5582|
|                 0|  206|
|     and Kitten"""|   26|
|   book-blogger"""|   13|
|                 6|    9|
|                 7|    7|
|              ..."|    7|
|             RN"""|    6|
|                47|    5|
|                10|    5|
|     Music Fan..."|    3|
|                14|    3|
| and book lover"""|    3|
|        swimme..."|    2|
|             a..."|    2|
|                33|    2|
+------------------+-----+
only showing top 20 rows



In [22]:
import pyspark.sql.functions as F

# Putting it All Together

Let's look at the distribution of scores

In [22]:
data_pd = data.groupBy('Score').count().toPandas()

ImportError: Pandas >= 0.19.2 must be installed; however, your version was 0.18.1.