# __Learning Spark__ (A quick beginners guide)
Big data analysis with Spark has been one of the most fun topics I've learned in my Master's program and I want to share all of the goodness! 
Apache Spark, being a FAST cluster computing platform is the goto for processing and analyzing BIG datasets. This is because single machines don't have enough power to perform computations on large amounts of data. A cluster is a group of computers that combines the resources of many computers together so that we can use all of the combined resources as if it was acting as a single computer. Spark coordinates the work across these computers.  

The resource that will teach you the most is: https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/
## Spark Concepts
A Spark Cluster is where our Spark job executes. Spark applications contain a driver node that contains your program, this node which launches parallel operations on a cluster and the worker nodes, also called executors, execute all of the analysis in parallel. The driver program is basically in the driver seat and manages the workers. 

# Let's code! 

In [1]:
# The SparkContext object allows the driver programs access Spark and acts as a connection to a computing cluster. 

# Import Dependencies
import os
import sys
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark import SparkContext
from pyspark.sql import SparkSession

# Using SparkContext vs SparkSession
# Creating a SparkContext instance (driver program) to control your Spark application. 
# Note: This is may be depricated in future instances 
sc = SparkContext()

# You can also create a SparkSession instance (driver program) to control your Spark application. 
spark = SparkSession.builder.appName('Practice').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/29 06:34:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Resilient Distributed Datasets (RDD) - What are they? 
* RDDs is an immutable distributed collection of elements which is split into partitions that may be computed on different nodes. 
* RDD Operations have Tranformations which result in another RDD or Actions which compute a single result.
* Almost all of the code you run in PySpark DataFrames will be reduced to an RDD; however DataFrames are more efficient and stable than creating manual RDDs.

## Transformations return a new RDD
## Examples

In [2]:
# Lets create an rdd! 
import numpy as np
text_array = np.array([["The Cat In The Hat"],["Hop On Pop"],["Are You My Mother?"],["Green Eggs And Ham"]])
rdd = sc.parallelize(text_array).persist() # Persist helps it stay in memory and makes everything faster. 

# map()
### Applies a function to each element of the current RDD and returns a new RDD

In [3]:
rdd.map(lambda title: title[0].split(" ")).collect()

                                                                                

[['The', 'Cat', 'In', 'The', 'Hat'],
 ['Hop', 'On', 'Pop'],
 ['Are', 'You', 'My', 'Mother?'],
 ['Green', 'Eggs', 'And', 'Ham']]

# flatMap()
## Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

In [4]:
# Flatten all lists into a single list of words
rdd.flatMap(lambda title: title[0].split(" ")).collect()

['The',
 'Cat',
 'In',
 'The',
 'Hat',
 'Hop',
 'On',
 'Pop',
 'Are',
 'You',
 'My',
 'Mother?',
 'Green',
 'Eggs',
 'And',
 'Ham']

# filter()
## If you are familiar with SQL this is almost like applying a WHERE clause. filter() returns a new RDD by selecting those elements of the source RDD on where the function returns true.

In [5]:
# We can chain (lazy evaluate) the transformations together. 
# Here, we are looking for the words from the title that start with 'M' 
rdd.flatMap(lambda title: title[0].split(" ")).filter(lambda word: word[0]=="M").collect()
## Uncomment below to see all of the words which have 'a' as the second letter in the word. 
# rdd.flatMap(lambda title: title[0].split(" ")).filter(lambda word: word[1]=="a").collect()

['My', 'Mother?']

In [6]:
# Could also use 'startswith' 
rdd.flatMap(lambda x: x[0].split(" ")).filter(lambda word: word.startswith("M")).collect()


['My', 'Mother?']

# Joins! Joining RDDs

In [7]:
# Lets download a file!
rdd = sc.textFile("./people.csv")
rdd2 = sc.textFile("./cust_foods.csv")
rdd.collect()
#print(rdd2.collect())

['Id,First_Name,Last_Name',
 '1,Homer,Hill',
 '2,Margie,Hill',
 '3,Bartholomew ,Hill',
 '4,Hank,Simpson',
 '5,Peggy,Simpson',
 '6,Bobby ,Simpson']

In [8]:
# Assume the first line is the header
#header = rdd.first()

# Filter out the header to avoid processing it as data
#data = rdd.filter(lambda line: line != header)

rdd_split=rdd.map(lambda line: line.split(","))
rdd2_split=rdd2.map(lambda line: line.split(","))

In [9]:
rdd_split.collect()

[['Id', 'First_Name', 'Last_Name'],
 ['1', 'Homer', 'Hill'],
 ['2', 'Margie', 'Hill'],
 ['3', 'Bartholomew ', 'Hill'],
 ['4', 'Hank', 'Simpson'],
 ['5', 'Peggy', 'Simpson'],
 ['6', 'Bobby ', 'Simpson']]

In [10]:
rdd2_split.collect()

[['cust_id', 'Item', 'Cost', 'Qty'],
 ['1', 'Donuts', '7.99', '2'],
 ['2', 'Bourbon', '15.49', '1'],
 ['3', 'Slushies', '4.99', '1'],
 ['4', 'Steak', '18.95', '3'],
 ['5', 'Turkey', '15.67', '1'],
 ['6', 'Deli Meats', '5.99', '5']]

## Before we join, we need to transform the RDD into (Key, Value) pairs

In [11]:
# Transforming to key value pairs
rdd_pairs = rdd_split.map(lambda x: (x[0], (x[1], x[2])))
rdd2_pairs = rdd2_split.map(lambda x: (x[0], (x[1], x[2], x[3])))

In [12]:
# Perform a join
joined_rdd=rdd_pairs.join(rdd2_pairs)
joined_rdd.collect()

[('4', (('Hank', 'Simpson'), ('Steak', '18.95', '3'))),
 ('3', (('Bartholomew ', 'Hill'), ('Slushies', '4.99', '1'))),
 ('6', (('Bobby ', 'Simpson'), ('Deli Meats', '5.99', '5'))),
 ('1', (('Homer', 'Hill'), ('Donuts', '7.99', '2'))),
 ('2', (('Margie', 'Hill'), ('Bourbon', '15.49', '1'))),
 ('5', (('Peggy', 'Simpson'), ('Turkey', '15.67', '1')))]

## READY SET ACTION! 
### Actions reduce to a single value

## Reduce

In [13]:
# Sum of squares example! 
# Create a RDD of numbers
rdd_ss = sc.parallelize([1,2,3,4])
# Square each using map 
squared_rdd = rdd_ss.map(lambda x: x*x)
# reduce or add up all of the numbers squared 
reduced = squared_rdd.reduce(lambda x,y: x+y)
reduced

30

In [14]:
# Reduce by key - does exactly what it sounds like
heart_rdd = sc.textFile('./heart.txt')
heart_flat=heart_rdd.flatMap(lambda x: x.split(" "))
heart = heart_flat.map(lambda x:(x,1))
heart.reduceByKey(lambda x,y:x+y).collect()

[('now', 2),
 ('always', 1),
 ('got', 1),
 ('own', 1),
 ('never', 1),
 ('really', 1),
 ('cared', 1),
 ('until', 1),
 ('And', 1),
 ('do', 2),
 ('Till', 1),
 ('I', 5),
 ('by', 1),
 ('on', 1),
 ('my', 1),
 ('met', 1),
 ('you', 3),
 ('it', 1),
 ('chills', 1),
 ('me', 1),
 ('to', 1),
 ('the', 1),
 ('bone', 1),
 ('How', 2),
 ('get', 2),
 ('alone', 2)]

In [15]:
# countByKey()
# essentially the same as above
heart.countByKey()

defaultdict(int,
            {'Till': 1,
             'now': 2,
             'I': 5,
             'always': 1,
             'got': 1,
             'by': 1,
             'on': 1,
             'my': 1,
             'own': 1,
             'never': 1,
             'really': 1,
             'cared': 1,
             'until': 1,
             'met': 1,
             'you': 3,
             'And': 1,
             'it': 1,
             'chills': 1,
             'me': 1,
             'to': 1,
             'the': 1,
             'bone': 1,
             'How': 2,
             'do': 2,
             'get': 2,
             'alone': 2})

In [16]:
# Counting the items in the RDD
rdd_ss.count()

4

In [17]:
rdd_ss.min()

1

In [18]:
rdd_ss.max()

4

## Using DataFrames in Spark!

In [19]:
df = spark.read.csv('./cust_foods.csv', inferSchema=True, header=True)

In [20]:
df.show()

+-------+----------+-----+---+
|cust_id|      Item| Cost|Qty|
+-------+----------+-----+---+
|      1|    Donuts| 7.99|  2|
|      2|   Bourbon|15.49|  1|
|      3|  Slushies| 4.99|  1|
|      4|     Steak|18.95|  3|
|      5|    Turkey|15.67|  1|
|      6|Deli Meats| 5.99|  5|
+-------+----------+-----+---+



## Aggregations 

In [26]:
from pyspark.sql.functions import countDistinct, avg, stddev 
df.select(stddev('Cost')).show()

+-----------------+
|     stddev(Cost)|
+-----------------+
|5.896960799146172|
+-----------------+



In [27]:
df.select(avg('Cost')).show()

+------------------+
|         avg(Cost)|
+------------------+
|11.513333333333334|
+------------------+



In [28]:
df.select(countDistinct('Cost')).show()

+--------------------+
|count(DISTINCT Cost)|
+--------------------+
|                   6|
+--------------------+



## Filter

In [21]:
df.filter(df['Cost'] < 8).select(['Item', 'Cost']).show()

+----------+----+
|      Item|Cost|
+----------+----+
|    Donuts|7.99|
|  Slushies|4.99|
|Deli Meats|5.99|
+----------+----+



In [22]:
# Select rows with cost price less than 16 and cost greater than 7 
df.filter((df['Cost'] < 16) & (df['Cost'] > 7)).select(['Item', 'Cost']).show()

+-------+-----+
|   Item| Cost|
+-------+-----+
| Donuts| 7.99|
|Bourbon|15.49|
| Turkey|15.67|
+-------+-----+



## Adding New Columns

In [23]:
df.withColumn("Total", df['Cost'] * df['Qty']).show()

+-------+----------+-----+---+------------------+
|cust_id|      Item| Cost|Qty|             Total|
+-------+----------+-----+---+------------------+
|      1|    Donuts| 7.99|  2|             15.98|
|      2|   Bourbon|15.49|  1|             15.49|
|      3|  Slushies| 4.99|  1|              4.99|
|      4|     Steak|18.95|  3|56.849999999999994|
|      5|    Turkey|15.67|  1|             15.67|
|      6|Deli Meats| 5.99|  5|29.950000000000003|
+-------+----------+-----+---+------------------+



## sort and orderBy

In [24]:
df.sort(df['Qty']).show()
df.orderBy(df['Item']).show()

+-------+----------+-----+---+
|cust_id|      Item| Cost|Qty|
+-------+----------+-----+---+
|      2|   Bourbon|15.49|  1|
|      3|  Slushies| 4.99|  1|
|      5|    Turkey|15.67|  1|
|      1|    Donuts| 7.99|  2|
|      4|     Steak|18.95|  3|
|      6|Deli Meats| 5.99|  5|
+-------+----------+-----+---+

+-------+----------+-----+---+
|cust_id|      Item| Cost|Qty|
+-------+----------+-----+---+
|      2|   Bourbon|15.49|  1|
|      6|Deli Meats| 5.99|  5|
|      1|    Donuts| 7.99|  2|
|      3|  Slushies| 4.99|  1|
|      4|     Steak|18.95|  3|
|      5|    Turkey|15.67|  1|
+-------+----------+-----+---+



## Using Spark SQL! 

In [81]:
# Register as a sql temp view of a table called people
car_df = spark.read.csv('cars.csv', inferSchema=True, header=True)
# Drop columns 
car_df = car_df.drop("_c4")
car_df = car_df.drop("_c5")
car_df = car_df.drop("_c6")
car_df.createOrReplaceTempView('cars')


In [82]:
# Now we can use pure SQL to query through
query1 = spark.sql("SELECT * FROM cars")
query1.show()

+---+------------+------+------+
| Id|         car|person|  cost|
+---+------------+------+------+
|  1|    Corvette|  Katy|100000|
|  2|      Camero|  Ryan| 50000|
|  3|Lamborghini | Kevin|160000|
|  4|    Ferarri |  Ryan|200000|
|  5|          GT|  Katy|  NULL|
|  6|     Mustang| Kevin| 50000|
+---+------------+------+------+



In [83]:
query2 = spark.sql("SELECT * FROM cars WHERE cost < 100000")
query2.show()

+---+-------+------+-----+
| Id|    car|person| cost|
+---+-------+------+-----+
|  2| Camero|  Ryan|50000|
|  6|Mustang| Kevin|50000|
+---+-------+------+-----+



## Group By

In [92]:
query3 = spark.sql("SELECT person,avg(cost) FROM cars GROUP BY person")
query3.show()

+------+---------+
|person|avg(cost)|
+------+---------+
| Kevin| 105000.0|
|  Ryan| 125000.0|
|  Katy| 100000.0|
+------+---------+



In [90]:
# In dataframes: Groups by the average cost of each person's car
car_df.groupBy("person").avg("cost").show()

+------+---------+
|person|avg(cost)|
+------+---------+
| Kevin| 105000.0|
|  Ryan| 125000.0|
|  Katy| 100000.0|
+------+---------+



## Joins

In [85]:
# Inner Join
people_df = spark.read.csv('people.csv', inferSchema=True, header=True)
food_df = spark.read.csv('cust_foods.csv', inferSchema=True, header=True)
people_df.createOrReplaceTempView('people')
food_df.createOrReplaceTempView('cust_food')

In [86]:
people_df.show()
food_df.show()

+---+------------+---------+
| Id|  First_Name|Last_Name|
+---+------------+---------+
|  1|       Homer|     Hill|
|  2|      Margie|     Hill|
|  3|Bartholomew |     Hill|
|  4|        Hank|  Simpson|
|  5|       Peggy|  Simpson|
|  6|      Bobby |  Simpson|
+---+------------+---------+

+-------+----------+-----+---+
|cust_id|      Item| Cost|Qty|
+-------+----------+-----+---+
|      1|    Donuts| 7.99|  2|
|      2|   Bourbon|15.49|  1|
|      3|  Slushies| 4.99|  1|
|      4|     Steak|18.95|  3|
|      5|    Turkey|15.67|  1|
|      6|Deli Meats| 5.99|  5|
+-------+----------+-----+---+



In [96]:
# Performing Inner Join
table = spark.sql("SELECT * FROM people as p INNER JOIN cust_food as f ON p.id = f.cust_id")
table.show()

+---+------------+---------+-------+----------+-----+---+
| Id|  First_Name|Last_Name|cust_id|      Item| Cost|Qty|
+---+------------+---------+-------+----------+-----+---+
|  1|       Homer|     Hill|      1|    Donuts| 7.99|  2|
|  2|      Margie|     Hill|      2|   Bourbon|15.49|  1|
|  3|Bartholomew |     Hill|      3|  Slushies| 4.99|  1|
|  4|        Hank|  Simpson|      4|     Steak|18.95|  3|
|  5|       Peggy|  Simpson|      5|    Turkey|15.67|  1|
|  6|      Bobby |  Simpson|      6|Deli Meats| 5.99|  5|
+---+------------+---------+-------+----------+-----+---+



In [107]:
# Left Outer Join
table2 = spark.sql("""
    SELECT p.*, f.Item, f.Cost
    FROM people as p
    LEFT OUTER JOIN cust_food as f
    ON p.id = f.cust_id
""")
table2.show()

+---+------------+---------+----------+-----+
| Id|  First_Name|Last_Name|      Item| Cost|
+---+------------+---------+----------+-----+
|  1|       Homer|     Hill|    Donuts| 7.99|
|  2|      Margie|     Hill|   Bourbon|15.49|
|  3|Bartholomew |     Hill|  Slushies| 4.99|
|  4|        Hank|  Simpson|     Steak|18.95|
|  5|       Peggy|  Simpson|    Turkey|15.67|
|  6|      Bobby |  Simpson|Deli Meats| 5.99|
+---+------------+---------+----------+-----+



## Subqueries

In [109]:
# Find the the spend less than $20 on a meal. 
subquery1 = spark.sql("""
    SELECT p.first_name
    FROM people AS p
    INNER JOIN (
        SELECT cust_id
        FROM (
            SELECT cust_id, cost * qty AS total_cost
            FROM cust_food
        ) AS cf
        WHERE total_cost < 20
    ) AS filtered_cf
    ON p.id = filtered_cf.cust_id
""")
subquery1.show()

+------------+
|  first_name|
+------------+
|       Homer|
|      Margie|
|Bartholomew |
|       Peggy|
+------------+



# Exercises

1. Find the number of items in the list where the length is 3. 

In [40]:
# Answer:
rdd = sc.textFile("./problem1.txt")
# Take out the quots then split and filter.
rdd.map(lambda x: x.replace('"', '')).flatMap(lambda x: x.split(" ")).filter(lambda x: len(x)==3).distinct().collect()

['ago',
 'was',
 'run',
 'But',
 'try',
 'not',
 'you',
 'man',
 'You',
 'me,',
 'get',
 "I'm"]

2. List the first 2 letters of a words in the RDD and count them and sort them in ascending order. (continue using problem1.txt) 

In [54]:
# Answer
rdd_split = rdd.map(lambda x: x.replace('"', '')).flatMap(lambda x: x.split(" ")).map(lambda word: (word[:2],1)).sortByKey()
rdd_split.countByKey()

defaultdict(int,
            {'A': 1,
             'Bu': 1,
             'Co': 2,
             'He': 1,
             'I': 3,
             "I'": 1,
             'Se': 1,
             'Tr': 1,
             'We': 1,
             'Wh': 1,
             'Yo': 2,
             'a': 3,
             'ag': 1,
             'aw': 2,
             'bl': 1,
             'ca': 1,
             'co': 1,
             'do': 1,
             'dr': 1,
             'ea': 1,
             'ey': 1,
             'ge': 1,
             'gi': 1,
             'ha': 1,
             'he': 3,
             'hi': 1,
             'ho': 1,
             'in': 1,
             'it': 1,
             'kn': 3,
             'la': 1,
             'le': 1,
             'li': 1,
             'lo': 3,
             'ma': 3,
             'me': 4,
             'ne': 1,
             'ni': 1,
             'no': 2,
             'on': 1,
             'ot': 1,
             'pr': 1,
             'ri': 1,
             'ru': 1,
             'sa':

3. Fill in missing null values of the dataframe items with the average value. (Use cars.csv)

In [61]:
car_df = spark.read.csv('cars.csv', inferSchema=True, header=True)
car_df.show()

+---+------------+------+------+----+----+----+
| Id|         car|person|  cost| _c4| _c5| _c6|
+---+------------+------+------+----+----+----+
|  1|    Corvette|  Katy|100000|NULL|NULL|NULL|
|  2|      Camero|  Ryan| 50000|NULL|NULL|NULL|
|  3|Lamborghini | Kevin|160000|NULL|NULL|NULL|
|  4|    Ferarri |  Ryan|200000|NULL|NULL|NULL|
|  5|          GT|  Katy|  NULL|NULL|NULL|NULL|
|  6|     Mustang| Kevin| 50000|NULL|NULL|NULL|
+---+------------+------+------+----+----+----+



24/07/29 08:25:56 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Id, car, person, cost, , , 
 Schema: Id, car, person, cost, _c4, _c5, _c6
Expected: _c4 but found: 
CSV file: file:///Users/ameek/Documents/UCSD_Q3/cars.csv


In [62]:
# Drop columns 
car_df = car_df.drop("_c4")
car_df = car_df.drop("_c5")
car_df = car_df.drop("_c6")

In [63]:
car_df.show()

+---+------------+------+------+
| Id|         car|person|  cost|
+---+------------+------+------+
|  1|    Corvette|  Katy|100000|
|  2|      Camero|  Ryan| 50000|
|  3|Lamborghini | Kevin|160000|
|  4|    Ferarri |  Ryan|200000|
|  5|          GT|  Katy|  NULL|
|  6|     Mustang| Kevin| 50000|
+---+------------+------+------+



In [75]:
avg_cost= car_df.select(avg('cost'))#.collect()
avg_cost.show()
avg_cost= avg_cost.collect()

+---------+
|avg(cost)|
+---------+
| 112000.0|
+---------+



In [76]:
avg_cost_val= avg_cost[0][0]
avg_cost_val

112000.0

In [77]:
# Fill in the NULL with the average!
car_df.na.fill(avg_cost_val, subset=['cost']).show()

+---+------------+------+------+
| Id|         car|person|  cost|
+---+------------+------+------+
|  1|    Corvette|  Katy|100000|
|  2|      Camero|  Ryan| 50000|
|  3|Lamborghini | Kevin|160000|
|  4|    Ferarri |  Ryan|200000|
|  5|          GT|  Katy|112000|
|  6|     Mustang| Kevin| 50000|
+---+------------+------+------+



# Part 2 will be Machine Learning with Spark! 