In [1]:
import findspark

In [2]:
findspark.init()
findspark.find()

'/home/henry/spark'

- Apache Spark is considerably faster than Apache Hadoop because it uses in-memory caching and optimized execution

In [3]:
from pyspark.sql import SparkSession

- Create an instance of this class, which gives you access to an instance of SparkContext
- A SparkContext holds a connection to the Spark cluster manager and can be used to create RDDs and broadcast variables in the cluster.

In [4]:
# create entry point to programing Spar
spark = SparkSession.builder.getOrCreate()

22/10/11 11:17:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
# create RDD from reading file
records = spark.sparkContext.textFile("data/sample.txt")

In [6]:
# showing data
records.collect()

                                                                                

['Red,Fox,is,fast', 'red,Fox,by,Here']

In [7]:
# covert each element of RDD records to lowercase
lower_records = records.map(lambda x: x.lower())

In [8]:
lower_records.collect()

                                                                                

['red,fox,is,fast', 'red,fox,by,here']

In [9]:
# 1-to-many transformation, convert each elements into a sequence of target elements
# flatMap() apply function split(",") then flattening the result
words = lower_records.flatMap(lambda x: x.split(","))

In [10]:
words.collect()

['red', 'fox', 'is', 'fast', 'red', 'fox', 'by', 'here']

In [11]:
# using filter to select elements satisfy conditions
filterd_words = words.filter(lambda word: len(word) > 2)
filterd_words.collect()

['red', 'fox', 'fast', 'red', 'fox', 'here']

Flow:
- Turn data into Spark Abstraction(Data Structure)
- Apply transformation and actions

## 2. Web History

In [12]:
def create_pair(record):
    '''
    split <ulr_address>,<frequency>
    '''
    tokens = record.split(",")
    url_address, frequency = tokens
    return url_address, int(frequency)

In [13]:
import statistics
def compute_stats(freq):
    '''
    take list of frequencies then return list of statistics
    '''
    avg = statistics.mean(freq)
    median = statistics.median(freq)
    sd = statistics.stdev(freq)
    return avg, median, sd

In [14]:
history = spark.sparkContext.textFile("data/web_history.txt")
history.collect()

['http://mapreduce4hackers.com,19779',
 'http://mapreduce4hackers.com,31230',
 'http://mapreduce4hackers.com,15708',
 'https://www.illumina.com,87000',
 'https://www.illumina.com,58086']

In [15]:
valid_his = history.filter(lambda record: len(record) > 5)
valid_his.collect()

['http://mapreduce4hackers.com,19779',
 'http://mapreduce4hackers.com,31230',
 'http://mapreduce4hackers.com,15708',
 'https://www.illumina.com,87000',
 'https://www.illumina.com,58086']

In [16]:
pair_his = valid_his.map(create_pair)
pair_his.collect()

[('http://mapreduce4hackers.com', 19779),
 ('http://mapreduce4hackers.com', 31230),
 ('http://mapreduce4hackers.com', 15708),
 ('https://www.illumina.com', 87000),
 ('https://www.illumina.com', 58086)]

In [17]:
grouped_his = pair_his.groupByKey().mapValues(compute_stats)
grouped_his.collect()

[('https://www.illumina.com', (72543, 72543.0, 20445.285471227835)),
 ('http://mapreduce4hackers.com', (22239, 19779, 8048.094246466054))]

-> Make sure all functions work successly on single data before apply in Spark Transformation

## 3. RDD (resilient distributed dataset)
RDDs support two types of operations
- **transformations**, which transform the source RDD(s) into one or more new RDDs, 
- and **actions**,which transform the source RDD(s) into a non-RDD object such as a dictionary or array

- transformation: map(), flatMap(), groupByKey(), reduceByKey(), filter()
- RDDs are not evaluated until an action is performed on them

### 3.1 Transformations

In [18]:
sample = [('A', 7), ('A', 8), ('A', -4),
         ('B', 3), ('B', 9), ('B', -1),
         ('C', 1), ('C', 5)]

In [19]:
sample_rdd = spark.sparkContext.parallelize(sample)

In [20]:
positive = sample_rdd.filter(lambda x: x[1] > 0)
positive.collect()

[('A', 7), ('A', 8), ('B', 3), ('B', 9), ('C', 1), ('C', 5)]

In [21]:
def getSumAndAvg(values):
    total = sum(values)
    avg = total/len(values)
    return total, avg

In [22]:
# mapValues will apply on list of values after groupByKey finished
sum_n_avg = positive.groupByKey().mapValues(lambda values: getSumAndAvg(values))
sum_n_avg.collect()

[('B', (12, 6.0)), ('C', (6, 3.0)), ('A', (15, 7.5))]

In [23]:
# alternative: using reduceByKey()
positive_count = positive.mapValues(lambda v: (v, 1))
positive_count.collect()

[('A', (7, 1)),
 ('A', (8, 1)),
 ('B', (3, 1)),
 ('B', (9, 1)),
 ('C', (1, 1)),
 ('C', (5, 1))]

In [24]:
sum_count = positive_count.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
sum_count.collect()

[('B', (12, 2)), ('C', (6, 2)), ('A', (15, 2))]

In [25]:
sum_n_avg = sum_count.mapValues(lambda x: (x[0], float(x[0])/x[1]))
sum_n_avg.collect()

[('B', (12, 6.0)), ('C', (6, 3.0)), ('A', (15, 7.5))]

- groupByKey() can cause out of Memory error
- reduceByKey data is combined in each partition -> only one output for each key send over the network
- Overall, reduceByKey is more scaleable than groupByKey

### 3.2 Actions

1. Spark actions produces non-RDD values
- reduce() -> single value
- collect() -> list of datatype
- count() -> number of elements of RDD
- saveAsTextFile() -> save to disk
- saveAsMap() -> save RDD\[(K, V)\] to disk as dict\[K, V\]

-> avoid using collect() to prevent memory error, instead using takeSample()

In [29]:
sum_count.take(3)

[('B', (12, 2)), ('C', (6, 2)), ('A', (15, 2))]

## 4. DataFrame

### 4.1 Read to DataFrame

In [30]:
virus_path = "data/infection.txt"
sdf_case = spark.read.load(virus_path, format="csv",
                           sep=",", inferSchema="true", header="true")

In [32]:
sdf_case.show(3)

+-------+-------+----------+--------------+---------+
|case_id|country|      city|infection_case|confirmed|
+-------+-------+----------+--------------+---------+
|  C0001|    USA|  New York|       contact|      175|
|  C0008|    USA|New Jersey|       unknown|       25|
|  C0009|    USA| Cupertino|       contact|      100|
+-------+-------+----------+--------------+---------+



In [33]:
sdf_case.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)



### 4.2 Basic Functions

In [34]:
from pyspark.sql import functions as F

In [36]:
sdf_case.sort(F.desc("confirmed")).show() # sort by confirmed column descending

+-------+-------+----------+--------------+---------+
|case_id|country|      city|infection_case|confirmed|
+-------+-------+----------+--------------+---------+
|  C0001|    USA|  New York|       contact|      175|
|  C0009|    USA| Cupertino|       contact|      100|
|  C0008|    USA|New Jersey|       unknown|       25|
+-------+-------+----------+--------------+---------+



In [37]:
sdf_case.filter((sdf_case.confirmed > 50) & (sdf_case.country == "USA")).show()

+-------+-------+---------+--------------+---------+
|case_id|country|     city|infection_case|confirmed|
+-------+-------+---------+--------------+---------+
|  C0001|    USA| New York|       contact|      175|
|  C0009|    USA|Cupertino|       contact|      100|
+-------+-------+---------+--------------+---------+



### 4.3 Basic Example

- Create a DataFrame and find the average and sum of hours worked by employees per department

In [38]:
dept_emps = [("Sales", "Barb", 40), ("Sales", "Dan", 20),
             ("IT", "Alex", 22), ("IT", "Jane", 24),
             ("HR", "Alex", 20), ("HR", "Mary", 30)]

In [39]:
sdf_emps = spark.createDataFrame(dept_emps, schema=["dept", "name", "hours"])

In [40]:
sdf_emps.show(3)

+-----+----+-----+
| dept|name|hours|
+-----+----+-----+
|Sales|Barb|   40|
|Sales| Dan|   20|
|   IT|Alex|   22|
+-----+----+-----+
only showing top 3 rows



In [43]:
sdf_avg_emps = sdf_emps.groupBy("dept").agg(F.avg("hours").alias("average"),
                           F.sum("hours").alias("total"))

In [44]:
sdf_avg_emps.show()

+-----+-------+-----+
| dept|average|total|
+-----+-------+-----+
|Sales|   30.0|   60|
|   HR|   25.0|   50|
|   IT|   23.0|   46|
+-----+-------+-----+



 - animal

In [45]:
data = [("fox", 6), ("dog", 5), ("fox", 3), ("dog", 8),
        ("cat", 1), ("cat", 2), ("cat", 3), ("cat", 4)]

In [46]:
rdd_animal = spark.sparkContext.parallelize(data)

In [47]:
rdd_animal.take(3)

[('fox', 6), ('dog', 5), ('fox', 3)]

In [49]:
rdd_animal.count() # count number of animals in list

8

In [50]:
rdd_sum = rdd_animal.reduceByKey(lambda x, y: x+y)
rdd_sum.collect()

[('fox', 9), ('dog', 13), ('cat', 10)]

In [51]:
rdd_filterd_sum = rdd_sum.filter(lambda x: x[1] < 12)
rdd_filterd_sum.collect()

[('fox', 9), ('cat', 10)]

In [52]:
# Grouping Similar Keys in depth
grouped = rdd_animal.groupByKey()
grouped.collect()

[('fox', <pyspark.resultiterable.ResultIterable at 0x7f66f2a97d50>),
 ('dog', <pyspark.resultiterable.ResultIterable at 0x7f66f2a97d90>),
 ('cat', <pyspark.resultiterable.ResultIterable at 0x7f66f2a97850>)]

In [56]:
grouped.map(lambda pair : (pair[0], list(pair[1]))).collect()

[('fox', [6, 3]), ('dog', [5, 8]), ('cat', [1, 2, 3, 4])]

In [57]:
# aggregating Values
aggregated = grouped.mapValues(lambda values: sum(values))
aggregated.collect()

[('fox', 9), ('dog', 13), ('cat', 10)]

## 5. ETL Example

- ETL is the general procedure of copying data from one or more sources into a destination system that represents the data differently from the source(s) or in a different context than the source(s)

*-* Let’s define our ETL process:
1. Extraction
- First, we create a DataFrame from a given JSON document.
2. Transformation
- Then we filter the data and keep the records for seniors (age > 54). Next, we add a new column, total, which is the total of males and females.
3. Loading
- Finally, we write the revised DataFrame into a MySQL database and verify the load process.

In [59]:
!wc -l data/census_2010.json

101 data/census_2010.json


In [60]:
!head -5 data/census_2010.json

{"females": 1994141, "males": 2085528, "age": 0, "year": 2010}
{"females": 1997991, "males": 2087350, "age": 1, "year": 2010}
{"females": 2000746, "males": 2088549, "age": 2, "year": 2010}
{"females": 2002756, "males": 2089465, "age": 3, "year": 2010}
{"females": 2004366, "males": 2090436, "age": 4, "year": 2010}


### 5.1 Extraction

In [62]:
census_path = "data/census_2010.json"
sdf_census = spark.read.json(census_path)
sdf_census.count()

101

In [63]:
sdf_census.show(3)

+---+-------+-------+----+
|age|females|  males|year|
+---+-------+-------+----+
|  0|1994141|2085528|2010|
|  1|1997991|2087350|2010|
|  2|2000746|2088549|2010|
+---+-------+-------+----+
only showing top 3 rows



### 5.2 Transformation

In [64]:
sdf_seniors = sdf_census[sdf_census['age'] > 54]

In [65]:
sdf_seniors.count()

46

In [66]:
sdf_seniors.show(3)

+---+-------+-------+----+
|age|females|  males|year|
+---+-------+-------+----+
| 55|2167706|2059204|2010|
| 56|2106460|1989505|2010|
| 57|2048896|1924113|2010|
+---+-------+-------+----+
only showing top 3 rows



In [67]:
sdf_seniors_final = sdf_seniors.withColumn("total",
                                           F.lit(sdf_seniors.males + sdf_seniors.females))

In [68]:
sdf_seniors_final.show(3)

+---+-------+-------+----+-------+
|age|females|  males|year|  total|
+---+-------+-------+----+-------+
| 55|2167706|2059204|2010|4226910|
| 56|2106460|1989505|2010|4095965|
| 57|2048896|1924113|2010|3973009|
+---+-------+-------+----+-------+
only showing top 3 rows



### 5.3 Loading

- write the seniors_final DataFrame into a MySQL table

In [None]:
# sdf_seniors_final\
# .write\
# .format("jdbc")\
# .option("driver", "com.mysql.jdbc.Driver")\
# .mode("overwrite")\
# .option("url", "jdbc:mysql://localhost/testdb")\
# .option("dbtable", "seniors")\
# .option("user", "root")\
# .option("password", "root_password")\
# .save()

In [None]:
# $ mysql -uroot -p
# Enter password: <password>
# Your MySQL connection id is 9
# Server version: 5.7.30 MySQL Community Server (GPL)
# mysql> use testdb;
# Database changed
# mysql> select * from seniors;