![](http://spark.apache.org/images/spark-logo.png)


# Examples

In [4]:
def sql(query):
    
    conn = sqlite3.connect(';memory;')
    cursor = conn.cursor()
    
    start_location = query.find('from')
    table_name = query[start_location + len('from '):query[start_location].find(' ')]
    
    cursor = conn.execute(query)
    names = [description[0] for description in cursor.description]
    
    cursor = conn.cursor()
    cursor.execute(query)
    result = cursor.fetchall()
    
    return pd.DataFrame(result, columns=names)

## Customer Placing the Largest Number of Orders

In [57]:
import pandas as pd
import sqlite3

# Connect to sqlite
conn = sqlite3.connect(';memory;')
cursor = conn.cursor()

# Insert data into sqlite
queries = '''
drop table if exists orders;
Create table If Not Exists orders (order_number int, customer_number int);
insert into orders (order_number, customer_number) values ('1', '1');
insert into orders (order_number, customer_number) values ('2', '2');
insert into orders (order_number, customer_number) values ('3', '3');
insert into orders (order_number, customer_number) values ('4', '3');
'''

query_list = queries.splitlines()[1:]

for query in query_list:
    cursor.execute(query)

conn.commit()

# Create a Pandas DataFrame from the data
df = sql('''
select *
from orders
''')

In [56]:
from pyspark.sql import SparkSession

def customer_orders(df):
    
    # Create a spark session, a spark dataframe and a temporary view
    spark = SparkSession.builder.appName('Customer Orders').getOrCreate()
    df = spark.createDataFrame(data = df)
    df.createOrReplaceTempView('orders')
    
    # Query the data
    query = '''

    select 
        o.customer_number
    from orders as o
    group by 
        o.customer_number
    order by count(*) desc
    limit 1;
    '''

    # Return results
    results = spark.sql(query)
    return results.show()

customer_orders(df)

+---------------+
|customer_number|
+---------------+
|              3|
+---------------+



In [58]:
from pyspark.sql import SparkSession

def customers_orders(df):
    
    # Create a spark session, a spark dataframe and a temporary view
    spark = SparkSession.builder.appName('Customer Orders').getOrCreate()
    df = spark.createDataFrame(data=df)
    df.createOrReplaceTempView('orders')
    
    # Query the data
    temp = df.select(df[1]).groupby(df[1]).count()
    results = temp.select(temp[0]).orderBy(temp[1], ascending=0).limit(num=1)
    return results.show()

customers_orders(df)

+---------------+
|customer_number|
+---------------+
|              3|
+---------------+



## Consecutive Numbers
Write an SQL query to find all numbers that appear at least three times consecutively.

In [43]:
from pyspark.sql import SparkSession

def consecutiveNumbers():
    
    spark = SparkSession.builder.appName('ConsecutiveNumber').getOrCreate()

    df = spark.read.option('header', 'true').csv('logs.csv')
    df.createOrReplaceTempView('logs')

    query = '''
    select 
        distinct l1.num as ConsecutiveNumber
    from logs as l1
    join logs as l2
    on l1.id = l2.id - 1
    join logs as l3
    on l2.id = l3.id -1
    where 
        l1.num = l2.num and l2.num = l3.num
    '''
    result = spark.sql(query)
    return result

In [44]:
result = consecutiveNumbers()
result.show()

+-----------------+
|ConsecutiveNumber|
+-----------------+
|              257|
|              274|
|              130|
|               63|
|              451|
|               98|
|              483|
|              231|
|               64|
|              154|
|              391|
|              334|
|               85|
|              110|
|              271|
|              247|
+-----------------+



## Game Play Analysis I

Write an SQL query to report the first login date for each player.

In [3]:
from pyspark.sql import SparkSession

def game_play_analysis():
    
    spark = SparkSession.builder.appName('game_play').getOrCreate()
    df = spark.read.option('header', 'true').csv('activity.csv')
    df.createOrReplaceTempView('activity')
    
    query = '''
    select 
    a1.player_id,
    min(a1.event_date) as first_login
    from activity as a1
    group by a1.player_id;
    '''
    
    result = spark.sql(query)
    return result

result = game_play_analysis()
result.show()

+---------+-----------+
|player_id|first_login|
+---------+-----------+
|        1| 2016-03-01|
|        2| 2017-06-25|
|        3| 2016-03-02|
+---------+-----------+



## Rank Score

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [33]:
def rank_score():
    
    # Starting a session
    spark = SparkSession.builder.appName('Nth_Highest_Salary').getOrCreate()

    # loading the data
    df = spark.read.option('header', 'true').option('inferSchema', 'true')\
    .csv('scores.csv')

    df.createOrReplaceTempView('scores')

    query = '''
    select 
        aaa.score,
        aaa.rank
    from 
    (
        select 
            s1.id,
            s1.score,
            count(*) as rank
        from scores as s1, (select distinct score from scores) as s2
        where s1.score <= s2.score
        group by s1.id, s1.score
        order by s1.score desc
    ) aaa
    '''

    result = spark.sql(query)
    return result

In [34]:
result = rank_score()
result.show()

+-----+----+
|score|rank|
+-----+----+
|  4.0|   1|
|  4.0|   1|
| 3.85|   2|
| 3.65|   3|
| 3.65|   3|
|  3.5|   4|
+-----+----+



## Nth_Highest_Salary

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession


def Nth_Highest_Salary(n):
    
    # Starting a session
    spark = SparkSession.builder.appName('Nth_Highest_Salary').getOrCreate()
    
    # loading the data
    df = spark.read.option('header', 'true').option('inferSchema', 'true')\
    .csv('employee.csv')
    
    # Create a temp view called employee
    df.createOrReplaceTempView('employee')
    
    # Query Data
    query = '''
    SELECT min(salary) as Nth_Highest_Salary
    from
    (
    select salary
    FROM Employee
    ORDER BY salary DESC
    LIMIT {}
    )
    '''.format(n)

    result = spark.sql(query)
    return result


result = Nth_Highest_Salary(2)
result.show()

+------------------+
|Nth_Highest_Salary|
+------------------+
|               200|
+------------------+



# Connect Spark to Databases

## Postgres

### Extract - tables called movies and users

- Download postgresql-42.2.18.jar and put into the location: "C:/Users/UserPc/Downloads/postgresql-42.2.18.jar"
- Launch a postgres server with the details below.
- The postgres server should have a database called:  "data_engineering_linkedin"

In [None]:
##import required libraries
import pyspark
from pyspark.sql import SparkSession

'''
Create Spark Session
'''
spark = pyspark.sql.SparkSession \
   .builder \
   .appName("Python Spark SQL basic example") \
   .config('spark.driver.extraClassPath', "C:/Users/UserPc/Downloads/postgresql-42.2.18.jar") \
   .getOrCreate()


'''
Extract - From Postgres
'''
def extract_movie_table():

   ##read table from db using spark jdbc
   movies_df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql://localhost:5432/data_engineering_linkedin") \
      .option("dbtable", "movies") \
      .option("user", "postgres") \
      .option("password", "1365") \
      .option("driver", "org.postgresql.Driver") \
      .load()

    return movies_df

def extract_users_table():

   ##read table from db using spark jdbc
   users_df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql://localhost:5432/data_engineering_linkedin") \
      .option("dbtable", "users") \
      .option("user", "postgres") \
      .option("password", "1365") \
      .option("driver", "org.postgresql.Driver") \
      .load()

    return users_df

+---+---------+--------------------+----------+
| id|     name|         description|  category|
+---+---------+--------------------+----------+
|  1|   Avatar|Avatar is a 2009 ...|    Sci-Fi|
|  2| Avengers|  Avengers is a 2009|    Sci-Fi|
|  3| Holidate|  Holidate is a 2009|Not Action|
|  4|John Wick| John Wick is a 2009|    Action|
+---+---------+--------------------+----------+

### Load to Postgres

In [None]:
def load_df_to_db(df):

   ##load transformed dataframe to the database
    properties = {"user": "postgres",
                  "password": "1365",
                  "driver": "org.postgresql.Driver"
                  }
                  
    df.write.jdbc(url="jdbc:postgresql://localhost:5432/data_engineering_linkedin",
                  table = "avg_ratings",
                  mode = "overwrite",
                  properties = properties)

### Full ETL Example - Extract from Postgres, transform and load to Postgres

In [None]:
##import required libraries
import pyspark
from pyspark.sql import SparkSession

'''
Create Spark Session
'''
spark = pyspark.sql.SparkSession \
   .builder \
   .appName("Python Spark SQL basic example") \
   .config('spark.driver.extraClassPath', "C:/Users/UserPc/Downloads/postgresql-42.2.18.jar") \
   .getOrCreate()


'''
Extract - From Postgres
'''
def extract_movie_table():

   ##read table from db using spark jdbc
   movies_df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql://localhost:5432/data_engineering_linkedin") \
      .option("dbtable", "movies") \
      .option("user", "postgres") \
      .option("password", "1365") \
      .option("driver", "org.postgresql.Driver") \
      .load()

    return movies_df

def extract_users_table():

   ##read table from db using spark jdbc
   users_df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql://localhost:5432/data_engineering_linkedin") \
      .option("dbtable", "users") \
      .option("user", "postgres") \
      .option("password", "1365") \
      .option("driver", "org.postgresql.Driver") \
      .load()

    return users_df


'''
Transform - Find average ratings per moive
'''
def transform_average_ratings(movies_df, users_df):

   # Create temporary views for both tables
   movies_df.createOrReplaceTempView('movies')
    users_df.createOrReplaceTempView('users')

   # Extract average ratings
   query = '''
   select 
      u.movie_id,
      m.name,
      avg(rating) as Average_Rating
   from users as u
   join movies as m
   on u.movie_id = m.id
   group by 
      u.movie_id,
      m.name
   order by u.movie_id desc;
   '''

    results = spark.sql(query)
    return results


'''
Load - back to Postgres
'''

def load_df_to_db(df):

   ##load transformed dataframe to the database
    properties = {"user": "postgres",
                  "password": "1365",
                  "driver": "org.postgresql.Driver"
                  }
                  
    df.write.jdbc(url="jdbc:postgresql://localhost:5432/data_engineering_linkedin",
                  table = "avg_ratings",
                  mode = "overwrite",
                  properties = properties)



if __name__ == "__main__":
   
   # Extract 
    movies_df = extract_movie_table()
    users_df = extract_users_table()

   # Transform
   ratings_df = transform_average_ratings(movies_df, users_df)

   # Load
   load_df_to_db(ratings_df)


## Data Camp Cheatsheet

http://datacamp-community-prod.s3.amazonaws.com/02213cb4-b391-4516-adcd-57243ced8eed

## Notion Resources

https://www.notion.so/bardadon/Udemy-Taming-Big-Data-with-Apache-Spark-and-Python-Hands-On-4bc70632803f4801b513df2ed99a41f3

In [6]:
import findspark
findspark.init()

In [11]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [15]:
import pyspark

# <ins>__Spark RDD__<ins>

# Initializing SparkContext

In [2]:
from pyspark import SparkConf, SparkContext

# Settings configurations
conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext(conf = conf)

## Spark Context UI

In [7]:
# Load the Spark UI in the localhost
sc

# Creating an RDD

## From a file

In [17]:
# Create an rdd from the file u.data
rdd = sc.textFile('u.data')

## From Hadoop

In [None]:
rdd = sc.textFile('hdfs://user/maria_dev/ml-100k/u.data')

## Hard coded RDD

In [27]:
rdd = sc.parallelize([1,2,3,4])

## RDD from a Hive database

In [None]:
hivCtx = HiveContext(sc)
rdd = hivCtx.sql("select name, age  from users")

## Dictionary RDD

In [48]:
rdd = sc.parallelize([1,2,3,4])
new_rdd = rdd.map(lambda x: (x,1))
new_rdd.collect()

[(1, 1), (2, 1), (3, 1), (4, 1)]

# Spark Context Commands

## Common Commands
- map
- flatmap
- filter
- distinct
- sample
- union
- intersection
- subtract
- cartesion

In [19]:
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([2,3,4,5])

### map
- Return a new RDD, by applying a function.
- For example, lets return a new rdd where each item is raised to second power.

__1. Using lambda function__


In [22]:
new_rdd = rdd1.map(lambda x: x**2)

In [25]:
new_rdd.collect()

[1, 4, 9, 16]

__2. Using my own function inside lambda__

In [34]:
def power_2(x):
    x_pow_2 = x**2
    return x_pow_2

In [35]:
new_rdd = rdd1.map(lambda x: power_2(x))

In [36]:
new_rdd.collect()

[1, 4, 9, 16]

__3. Using my own function__

In [None]:
# Take the age and number of friends
def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

In [62]:
# Creating an rdd from a csv file
rdd1 = sc.textFile('fakefriends.csv')

In [65]:
# Print a sample of the first 5 items
rdd1.collect()[0:5]

['0,Will,33,385',
 '1,Jean-Luc,26,2',
 '2,Hugh,55,221',
 '3,Deanna,40,465',
 '4,Quark,68,21']

- Now lets collect the age and number of friends

In [67]:
new_rdd = rdd1.map(parseLine)

In [69]:
new_rdd.collect()[0:5]

[(33, 385), (26, 2), (55, 221), (40, 465), (68, 21)]

### flatmap
- Returns a new flat RDD.
- By flat i mean that it takes a dictionary rdd and flatten it, it puts the keys and value in the same iteratble
- For example:

In [49]:
rdd1 = sc.parallelize([1,2,3,4])

In [50]:
# Creating a dictionary RDD using map
new_rdd = rdd1.map(lambda x: (x,1))

In [51]:
new_rdd.collect()

[(1, 1), (2, 1), (3, 1), (4, 1)]

In [52]:
# Creating a flat RDD using flatMap
new_rdd = rdd1.flatMap(lambda x: (x,1))

In [53]:
new_rdd.collect()

[1, 1, 2, 1, 3, 1, 4, 1]

- As we can see, these are the same values, only flat

### Difference between map and flatMap
- Lets split each line in the book using map and flatMap and check the results

In [186]:
book.collect()[0:10]

['Self-Employment: Building an Internet Business of One',
 'Achieving Financial and Personal Freedom through a Lifestyle Technology Business',
 'By Frank Kane',
 '',
 '',
 '',
 'Copyright � 2015 Frank Kane. ',
 'All rights reserved worldwide.',
 '',
 '']

In [184]:
flat_words = book.flatMap(lambda x: x.split())
flat_words.collect()[0:10]

['Self-Employment:',
 'Building',
 'an',
 'Internet',
 'Business',
 'of',
 'One',
 'Achieving',
 'Financial',
 'and']

In [185]:
words = book.map(lambda x: x.split())
words.collect()[0:10]

[['Self-Employment:', 'Building', 'an', 'Internet', 'Business', 'of', 'One'],
 ['Achieving',
  'Financial',
  'and',
  'Personal',
  'Freedom',
  'through',
  'a',
  'Lifestyle',
  'Technology',
  'Business'],
 ['By', 'Frank', 'Kane'],
 [],
 [],
 [],
 ['Copyright', '�', '2015', 'Frank', 'Kane.'],
 ['All', 'rights', 'reserved', 'worldwide.'],
 [],
 []]

- As we can see, with flatMap, each line has been split into words
- And with map, each line is now a list of words.
- So by using flat, we kinda go down a level and extract the items into a single iterable

### filter
- Returns a new rdd that meets the conditions of a function
- For example

In [54]:
rdd1 = sc.parallelize([1,2,3,4])

In [55]:
# Using lambda - Collect all the items that are bigger then 2
new_rdd = rdd1.filter(lambda x: x > 2)

In [56]:
new_rdd.collect()

[3, 4]

### mapValues
- Return a new dictionary rdd
- Run a function on each value of the items, without changing the keys.
- This also retains the original RDD's partitioning to make it much faster


In [70]:
new_rdd.collect()[0:5]

[(33, 385), (26, 2), (55, 221), (40, 465), (68, 21)]

In [75]:
# Make the value into a key-value pair of (value, 1)
new_rdd = new_rdd.mapValues(lambda x: (x,1))

In [76]:
new_rdd.collect()[0:5]

[(33, (385, 1)), (26, (2, 1)), (55, (221, 1)), (40, (465, 1)), (68, (21, 1))]

- As we can see, the function only handled the values and not the keys

# RDD Actions
- Once we have the rdd, we can run some RDD actions on it

## Common RDD Actions
- collect
- count
- countByValue
- take
- top
- reduce

In [8]:
# Now we can run commands on the file
rdd.count()

100000

In [28]:
# Return a list with all the contents
rdd.collect()

[1, 2, 3, 4]

In [18]:
# Return the first element
rdd.first()

'196\t242\t3\t881250949'

In [None]:
# Stop spark context before making another one
sc.stop()

## Advanced RDD Actions
- reduceByKey - Merge values for each key, according to a function
- countByValue() - Return the count of each unique value in a RDD as a dictionary of (value, count) pairs.
- groupByKey() - Group values with the same key
- sortByKey() - Sort values by key
- keys(), values() - Return an RDD of just the keys or values

__SQL Actions__
- join, rightOuterJoin, leftOuterJoin, cogroup, subtractByKey

In [77]:
new_rdd.collect()[0:5]

[(33, (385, 1)), (26, (2, 1)), (55, (221, 1)), (40, (465, 1)), (68, (21, 1))]

__reduceByKey__

In [79]:
total_friends_by_age = new_rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [81]:
total_friends_by_age.collect()[0:5]

[(33, (3904, 12)),
 (26, (4115, 17)),
 (55, (3842, 13)),
 (40, (4264, 17)),
 (68, (2696, 10))]

- Another example

In [292]:
parsedLines.collect()[0:5]

[(44, 37.19), (35, 65.89), (2, 40.64), (47, 14.98), (29, 13.08)]

- We can use reduceByKey to sum the second column by merging similar keys

In [307]:
# Sum the amount per customer
amount_per_customer = parsedLines.reduceByKey(lambda x, y: x + y)

In [308]:
amount_per_customer.collect()[0:5]

[(44, 4756.8899999999985),
 (35, 5155.419999999999),
 (2, 5994.59),
 (47, 4316.299999999999),
 (29, 5032.529999999999)]

- We can also add values with the add method:

In [312]:
from operator import add

# Sum the amount per customer
amount_per_customer = parsedLines.reduceByKey(add)

# Print the top 10 rows
amount_per_customer.collect()[0:10]

[(44, 4756.8899999999985),
 (35, 5155.419999999999),
 (2, 5994.59),
 (47, 4316.299999999999),
 (29, 5032.529999999999),
 (91, 4642.259999999999),
 (70, 5368.249999999999),
 (85, 5503.43),
 (53, 4945.299999999999),
 (14, 4735.030000000001)]

__countByValue__

In [206]:
# We have a list of words
words.collect()[0:5]

['Self-Employment:', 'Building', 'an', 'Internet', 'Business']

In [210]:
# Count unique values into a dictionary
words_dict = words.countByValue()
words_dict

defaultdict(int,
            {'Self-Employment:': 1,
             'Building': 5,
             'an': 172,
             'Internet': 13,
             'Business': 19,
             'of': 941,
             'One': 12,
             'Achieving': 1,
             'Financial': 3,
             'and': 901,
             'Personal': 3,
             'Freedom': 7,
             'through': 55,
             'a': 1148,
             'Lifestyle': 5,
             'Technology': 2,
             'By': 9,
             'Frank': 10,
             'Kane': 7,
             'Copyright': 1,
             '�': 174,
             '2015': 3,
             'Kane.': 1,
             'All': 13,
             'rights': 3,
             'reserved': 2,
             'worldwide.': 2,
             'CONTENTS': 1,
             'Disclaimer': 1,
             'Preface': 1,
             'Part': 2,
             'I:': 2,
             'Making': 5,
             'the': 1176,
             'Big': 1,
             'Decision': 1,
             'Overcoming'

__sortBy__

In [295]:
amount_per_customer.collect()[0:5]

[(44, 94.79), (35, 28.2), (2, 56.05), (47, 75.45), (29, 46.37)]

In [296]:
# Sort it by the second column(amount paid)
sorted_amount_per_customer = amount_per_customer.sortBy(lambda x: x[1], ascending=False)

In [297]:
sorted_amount_per_customer.collect()[0:5]

[(83, 98.32), (54, 98.29), (77, 98.29), (42, 95.58), (44, 94.79)]

# RDD Examples

## Creating a rating historgram by counting values from a file

In [13]:
from pyspark import SparkConf, SparkContext
import collections

# Settings configurations
conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext(conf = conf)

# Loading the file  u.data
lines = sc.textFile("u.data")

# Parsing it
ratings = lines.map(lambda x: x.split()[2])

# Counting ratings
result = ratings.countByValue()

# Sorting results
sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print("%s %i" % (key, value))


1 6110
2 11370
3 27145
4 34174
5 21201


- Looks like the most frequent rating is 4 starts

In [15]:
lines.count()

100000

## Count the average number of friends per age

In [59]:
sc.stop()

from pyspark import SparkConf, SparkContext

# Configure and start a context manager
conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

# Take the age and number of friends
def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

# Sum the number of friends per age
lines = sc.textFile("fakefriends.csv")
rdd = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

# Take the average number of friends
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])

# Create a list of that result
results = averagesByAge.collect()

# Print a sample of 10 results
counter = 0
for result in results:
    
    print(result)
    counter += 1
    if counter == 10:
        break;

(33, 325.3333333333333)
(26, 242.05882352941177)
(55, 295.53846153846155)
(40, 250.8235294117647)
(68, 269.6)
(59, 220.0)
(37, 249.33333333333334)
(54, 278.0769230769231)
(38, 193.53333333333333)
(27, 228.125)


## Filter the minimum temperature by station id

In [83]:
sc.stop()

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("1800.csv")
parsedLines = lines.map(parseLine)
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))


ITE00100554	5.36F
EZE00100082	7.70F


## Find the maximum temperature for each station

In [85]:
# Create a Spark Context with configurations
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MaxTemperatures")
sc = SparkContext(conf = conf)

In [136]:
# Lets create an rdd and look at a sample
lines = sc.textFile('1800.csv')
lines.collect()[0:5]

['ITE00100554,18000101,TMAX,-75,,,E,',
 'ITE00100554,18000101,TMIN,-148,,,E,',
 'GM000010962,18000101,PRCP,0,,,E,',
 'EZE00100082,18000101,TMAX,-86,,,E,',
 'EZE00100082,18000101,TMIN,-135,,,E,']

- First, each line is a string, so ill need to turn them into separate items

In [161]:
def parseLines(line):
    field = line.split(',')
    station_id = field[0]
    entryType = field[2]
    temp = float(field[3]) * 0.1 * (9.0 / 5.0) + 32.0
    
    return (station_id, entryType, temp)

In [162]:
parsedLines = lines.map(parseLine)

In [163]:
parsedLines.collect()[0:5]

[('ITE00100554', 'TMAX', 18.5),
 ('ITE00100554', 'TMIN', 5.359999999999999),
 ('GM000010962', 'PRCP', 32.0),
 ('EZE00100082', 'TMAX', 16.52),
 ('EZE00100082', 'TMIN', 7.699999999999999)]

- So the maxium values are rows where there's TMAX in them, lets extract the maximum value for each unique key(i.e station)

In [164]:
maxTemps = parsedLines.filter(lambda x: "TMAX" in x[1])
maxTemps.collect()[0:5]

[('ITE00100554', 'TMAX', 18.5),
 ('EZE00100082', 'TMAX', 16.52),
 ('ITE00100554', 'TMAX', 21.2),
 ('EZE00100082', 'TMAX', 24.08),
 ('ITE00100554', 'TMAX', 27.86)]

- Lets filter only the required columns

In [165]:
stationTemps = maxTemps.map(lambda x: (x[0], x[2]))

In [166]:
stationTemps.collect()[0:5]

[('ITE00100554', 18.5),
 ('EZE00100082', 16.52),
 ('ITE00100554', 21.2),
 ('EZE00100082', 24.08),
 ('ITE00100554', 27.86)]

- Now lets find the maximum values for each UNIUQE station
- Ill need to use the reduceByKey action

In [167]:
maxTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))

In [168]:
maxTemps.collect()

[('ITE00100554', 90.14000000000001), ('EZE00100082', 90.14000000000001)]

- So the maximum temp was for two stations which is 90.14 Farenheight 

## Counting Word Occurences using Flatmap

In [189]:
sc.stop()

In [190]:
# Importing and setting a Spark Context
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MaxTemperatures")
sc = SparkContext(conf = conf)

In [192]:
# Creating an RDD from a book
book = sc.textFile('Book')
book.collect()[0:25]

['Self-Employment: Building an Internet Business of One',
 'Achieving Financial and Personal Freedom through a Lifestyle Technology Business',
 'By Frank Kane',
 '',
 '',
 '',
 'Copyright � 2015 Frank Kane. ',
 'All rights reserved worldwide.',
 '',
 '',
 'CONTENTS',
 'Disclaimer',
 'Preface',
 'Part I: Making the Big Decision',
 'Overcoming Inertia',
 'Fear of Failure',
 'Career Indoctrination',
 'The Carrot on a Stick',
 'Ego Protection',
 'Your Employer as a Security Blanket',
 'Why it�s Worth it',
 'Unlimited Growth Potential',
 'Investing in Yourself, Not Someone Else',
 'No Dependencies',
 'No Commute']

- We can use flatmap to divide all of these words:

In [226]:
l = ['No Dependencies, ', 'No Commute']

In [196]:
# Use flatMap to split the lines into words
words = book.flatMap(lambda x: x.split())
words.collect()[0:25]

['Self-Employment:',
 'Building',
 'an',
 'Internet',
 'Business',
 'of',
 'One',
 'Achieving',
 'Financial',
 'and',
 'Personal',
 'Freedom',
 'through',
 'a',
 'Lifestyle',
 'Technology',
 'Business',
 'By',
 'Frank',
 'Kane',
 'Copyright',
 '�',
 '2015',
 'Frank',
 'Kane.']

- Now lets count by unique value

In [204]:
word_count = words.countByValue()
word_count

defaultdict(int,
            {'Self-Employment:': 1,
             'Building': 5,
             'an': 172,
             'Internet': 13,
             'Business': 19,
             'of': 941,
             'One': 12,
             'Achieving': 1,
             'Financial': 3,
             'and': 901,
             'Personal': 3,
             'Freedom': 7,
             'through': 55,
             'a': 1148,
             'Lifestyle': 5,
             'Technology': 2,
             'By': 9,
             'Frank': 10,
             'Kane': 7,
             'Copyright': 1,
             '�': 174,
             '2015': 3,
             'Kane.': 1,
             'All': 13,
             'rights': 3,
             'reserved': 2,
             'worldwide.': 2,
             'CONTENTS': 1,
             'Disclaimer': 1,
             'Preface': 1,
             'Part': 2,
             'I:': 2,
             'Making': 5,
             'the': 1176,
             'Big': 1,
             'Decision': 1,
             'Overcoming'

- Now lets improve this by using regex to filter only the words(and get rid of all the non-word characters)

In [230]:
sc.stop()

import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("book")
words = input.flatMap(normalizeWords)
wordCounts = words.countByValue()

counter = 0
for word, count in wordCounts.items():
    cleanWord = word.encode('ascii', 'ignore')
    if (cleanWord):
        print(cleanWord.decode() + " " + str(count))
    
    counter += 1
    if counter == 5:
        break;


self 111
employment 75
building 33
an 178
internet 26


Next, lets sort it:

In [234]:
sc.stop()

import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("book")
words = input.flatMap(normalizeWords)

wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey()
results = wordCountsSorted.collect()

for result in results:
    count = str(result[0])
    word = result[1].encode('ascii', 'ignore')
    if (word) and int(count) > 1000:
        print(word.decode() + ":\t\t" + count)


a:		1191
the:		1292
your:		1420
to:		1828
you:		1878


## Sum the total amount paid per customerid 

- the first line is the customer id, the second line is the amount paid
- We should first split the rdd into only the first and third columns.
- Then sum the total paid by customerId.

In [238]:
from pyspark import SparkConf, SparkContext

In [252]:
sc.stop()

conf = SparkConf().setMaster('local').setAppName('CountAmountPaid')
sc = SparkContext(conf=conf)

In [253]:
orders = sc.textFile('customer-orders.csv')
orders.collect()[0:5]

['44,8602,37.19',
 '35,5368,65.89',
 '2,3391,40.64',
 '47,6694,14.98',
 '29,680,13.08']

- First, lets parse these lines as usual

In [258]:
def parseLines(line):
    fields = line.split(',')
    customerID = int(fields[0])
    amountPaid = float(fields[2])
    
    return (customerID, amountPaid)

In [259]:
parsedLines = orders.map(parseLines)
parsedLines.collect()[0:5]

[(44, 37.19), (35, 65.89), (2, 40.64), (47, 14.98), (29, 13.08)]

- Then, sum the amount paid per customer

In [305]:
# Sum the amount per customer
amount_per_customer = parsedLines.reduceByKey(lambda x, y: x + y)

In [306]:
# Print the top 10 rows
amount_per_customer.collect()[0:10]

[(44, 4756.8899999999985),
 (35, 5155.419999999999),
 (2, 5994.59),
 (47, 4316.299999999999),
 (29, 5032.529999999999),
 (91, 4642.259999999999),
 (70, 5368.249999999999),
 (85, 5503.43),
 (53, 4945.299999999999),
 (14, 4735.030000000001)]

In [313]:
# Sort it by the second column(amount paid)
sorted_amount_per_customer = amount_per_customer.sortBy(lambda x: x[1], ascending=False)

In [314]:
# Print the top 10 rows
sorted_amount_per_customer.collect()[0:10]

[(68, 6375.449999999997),
 (73, 6206.199999999999),
 (39, 6193.109999999999),
 (54, 6065.389999999999),
 (71, 5995.660000000003),
 (2, 5994.59),
 (97, 5977.189999999995),
 (46, 5963.109999999999),
 (42, 5696.840000000003),
 (59, 5642.89)]

# <ins>__SparkSQL__<ins>

# Initialize SparkSession

In [315]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [316]:
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()

# Create a DataFrame
- If the data that we want to load is structured(I.E tabualr form and with headers) then we can load it straight into a DatFrame.
- If the data is not structured. We should load it first into an RDD, transform it, and then load it into a DataFrame.

## Data Types:
https://spark.apache.org/docs/latest/sql-ref-datatypes.html

## Loading data without headers(unstructured)

### Example1 - mapping headers with Row

In [317]:
rdd = spark.sparkContext.textFile('fakefriends.csv')

In [319]:
rdd.collect()[0:5]

['0,Will,33,385',
 '1,Jean-Luc,26,2',
 '2,Hugh,55,221',
 '3,Deanna,40,465',
 '4,Quark,68,21']

- So the data doesnt have any headers, we cant simply load it into a DataFrame.
- Lets transform it using a function

In [320]:
def mapper(line):
    fields = line.split(',')
    return Row(
            ID = int(fields[0]),
            name = str(fields[1].encode('utf-8')),
            age = int(fields[2]),
            numFriends = int(fields[3])
    )
                      

In [321]:
structured_rdd = rdd.map(mapper)

- Now we can load it into DataFrame

In [322]:
schemaPeople = spark.createDataFrame(structured_rdd).cache()

- Finally, we need to create a temporary view from it

In [323]:
schemaPeople.createOrReplaceTempView('people')

### Example2 - Reading text
- Here we are reading a text into a DataFrame.
- When working with texts we probably should use some RDD commands as they are more suited then SQL

In [None]:
# Starting a session, loading the data, and inferring the schema
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()
book = spark.read.text('Book')

- Now we have a view called people that contains the data from the CSV file.

### Example3 - Building a schema
- We can also give Spark a schema of our choosing
- For example, the dataset 1800.csv doesnt have a header, so we can load it like this:

In [374]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

spark = SparkSession.builder.appName("MinTemperatures").getOrCreate()

schema = StructType([ \
                     StructField("stationID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("measure_type", StringType(), True), \
                     StructField("temperature", FloatType(), True)])


In [375]:
# // Read the file as dataframe
df = spark.read.schema(schema).csv("1800.csv")
df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: float (nullable = true)



- As we can see, each column got the header that we specified.

## Create DataFrame WITH headers(structured)

### Example1 - inferring schema

- Here we specify that the datafile has headers, and we let Spark infer the types of data himself.

In [342]:
people = spark.read.option('header', 'true').option('inferSchema', 'true')\
.csv('fakefriends-header.csv')

- We can take a look at the inffered schema

In [343]:
people.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)



- Everything looks good. Now we can either create a temp view and run sql queries.
- OR, run commands like filter, select, groupBy etc... straight on the DataFrame

In [353]:
people.createOrReplaceTempView('people')

- Now we can just query it as a normal DataFrame:

In [356]:
query = '''
select age,
       round(avg(friends),2) as Average
from people
where age between 13 and 25
group by age
order by age desc;
'''

df = spark.sql(query)
df.show()

+---+-------+
|age|Average|
+---+-------+
| 25| 197.45|
| 24|  233.8|
| 23|  246.3|
| 22| 206.43|
| 21| 350.88|
| 20|  165.0|
| 19| 213.27|
| 18| 343.38|
+---+-------+



### Example2 - Tab separated

In [None]:
# Load up movie data as dataframe
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("u.data")

# Export Dataframe

## CSV

In [43]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('test').master('local').getOrCreate()
df = spark.read.option('header', 'true').csv("activity.csv")
df.createOrReplaceTempView('activity')

In [44]:
result = spark.sql('select * from activity')
result.show(5)

+---------+---------+----------+------------+
|player_id|device_id|event_date|games_played|
+---------+---------+----------+------------+
|     1503|        1|2021-10-15|           2|
|     4106|        3|2020-07-22|           3|
|     3170|        3|2021-07-21|           9|
|     4518|        1|2020-08-03|           1|
|     4501|        1|2020-10-22|           4|
+---------+---------+----------+------------+
only showing top 5 rows



In [45]:
result.toPandas().to_csv('test.csv')

## Parquet

In [46]:
result.write.format('parquet').mode('overwrite').option('compression', 'gzip').save('data/activity_parquet')

Py4JJavaError: An error occurred while calling o275.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:638)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$25(FileFormatWriter.scala:267)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:267)
	... 41 more


## Export with Partitioning


In [None]:
result.write.format('parquet').mode('overwrite').option('compression', 'gzip').PartitionBy('Product').save('data/activity_parquet')

## Export with Bucketing

In [47]:
result.write.format('parquet').mode('overwrite').option('compression', 'gzip').bucketBy(3,'device_id').saveAsTable("device_bucket_table")

Py4JJavaError: An error occurred while calling o284.saveAsTable.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.validateTableLocation(SessionCatalog.scala:384)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:175)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:701)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:679)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


# SparkSession Commands
- Broadcast - Transfer an object to the Spark Driver
- value - Get the object from the Spark Driver

## Broadcast

### Broadcasting a dictionary and retrieving it using a user defined function(udf)
- This allows us to only broadcast an object once, and that object will be stored in all of the clusters.
- This is suitable for small datasets.
- It also reduced overhead

In [None]:
# Broadcasting the dictionary to the Spark Driver
nameDict = spark.sparkContext.broadcast(loadMovieNames())

In [None]:
# Create a user-defined function to look up movie names from our broadcasted dictionary
def lookupName(movieID):
    return nameDict.value[movieID]

lookupNameUDF = func.udf(lookupName)

## Accumulator
- Store a synced variable across all clusters 

### Example - Create a Counter variable that accumulates counters, and share it across all clusters.
- Also, iterate it by 1, by using the method .add()

In [None]:
hitCounter = sc.accumulator(0)
hitCounter.add(1)

# DataFrame Examples

## Count the average number of friends per age(Structured Data)

In [357]:
# Starting a session, loading the data, and inferring the schema
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()
people = spark.read.option('header', 'true').option('inferSchema', 'true')\
.csv('fakefriends-header.csv')

In [359]:
people.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)



In [360]:
# Creating a temp view
people.createOrReplaceTempView('people')

In [363]:
# Querying the data
query = '''
select 
    age,
    round(avg(friends), 2) as Average
from people
group by age
order by age desc
limit 5
'''

df  = spark.sql(query)
df.show()

+---+-------+
|age|Average|
+---+-------+
| 69|  235.2|
| 68|  269.6|
| 67| 214.63|
| 66| 276.44|
| 65|  298.2|
+---+-------+



## Counting words(Unstructured)

- Not every problem is a SQL problem.
- This is an example of a problem that is better suited to solve using RDD's
- Also we dont need to pick only one, we can use both

In [365]:
# Starting a session, loading the data, and inferring the schema
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()
book = spark.read.text('Book')

In [368]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func



# Split using a regular expression that extracts words
words = book.select(func.explode(func.split(book.value, "\\W+")).alias("word"))
wordsWithoutEmptyString = words.filter(words.word != "")

# Normalize everything to lowercase
lowercaseWords = wordsWithoutEmptyString.select(func.lower(wordsWithoutEmptyString.word).alias("word"))

# Count up the occurrences of each word
wordCounts = lowercaseWords.groupBy("word").count()

# Sort by counts
wordCountsSorted = wordCounts.sort("count")

# Show the results.
wordCountsSorted.show(wordCountsSorted.count())


+--------------------+-----+
|                word|count|
+--------------------+-----+
|              teamed|    1|
|               boost|    1|
|        manipulation|    1|
|            preneurs|    1|
|            laureate|    1|
|                unto|    1|
|             scaling|    1|
|          recommends|    1|
|            reacting|    1|
|             frankly|    1|
|             dilemma|    1|
|       documentation|    1|
|            rejected|    1|
|             harmony|    1|
|                bryc|    1|
|             legwork|    1|
|          production|    1|
|         supervision|    1|
|             dropped|    1|
|                reis|    1|
|              landed|    1|
|               apple|    1|
|                none|    1|
|                 hot|    1|
|              proven|    1|
|          marketable|    1|
|              quoted|    1|
|              bundle|    1|
|              attend|    1|
|                spin|    1|
|             textual|    1|
|            r

- Lets try to do it using SQL:

In [370]:
# Starting a session, loading the data, and inferring the schema
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()
book = spark.read.option('header', 'true').option('inferSchema', 'true')\
.text('book')

In [371]:
book.createOrReplaceTempView('book')

In [372]:
book.printSchema()

root
 |-- value: string (nullable = true)



In [373]:
query = '''
select *
from book
'''

df = spark.sql(query)
df.show()

+--------------------+
|               value|
+--------------------+
|Self-Employment: ...|
|Achieving Financi...|
|       By Frank Kane|
|                    |
|                    |
|                    |
|Copyright � 2015 ...|
|All rights reserv...|
|                    |
|                    |
|            CONTENTS|
|          Disclaimer|
|             Preface|
|Part I: Making th...|
|  Overcoming Inertia|
|     Fear of Failure|
|Career Indoctrina...|
|The Carrot on a S...|
|      Ego Protection|
|Your Employer as ...|
+--------------------+
only showing top 20 rows



- As we can see, each row is a string sentence.
- Breaking it down to words and counting everything using sql will be tough

## Average Temperature per station

In [390]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

spark = SparkSession.builder.appName("MinTemperatures").getOrCreate()

schema = StructType([ \
                     StructField("stationID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("measure_type", StringType(), True), \
                     StructField("temperature", FloatType(), True)])

# // Read the file as dataframe
df = spark.read.schema(schema).csv("1800.csv")
df.printSchema()                                               

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: float (nullable = true)



- Then we can do the excersise using rdd commands:

In [379]:
# Filter out all but TMIN entries
minTemps = df.filter(df.measure_type == "TMIN")

# Select only stationID and temperature
stationTemps = minTemps.select("stationID", "temperature")

# Aggregate to find minimum temperature for every station
minTempsByStation = stationTemps.groupBy("stationID").min("temperature")
minTempsByStation.show()

# Convert temperature to fahrenheit and sort the dataset
minTempsByStationF = minTempsByStation.withColumn("temperature",
                                                  func.round(func.col("min(temperature)") * 0.1 * (9.0 / 5.0) + 32.0, 2))\
                                                  .select("stationID", "temperature").sort("temperature")
                                                  
# Collect, format, and print the results
results = minTempsByStationF.collect()

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))
    
spark.stop()

+-----------+----------------+
|  stationID|min(temperature)|
+-----------+----------------+
|ITE00100554|          -148.0|
|EZE00100082|          -135.0|
+-----------+----------------+

ITE00100554	5.36F
EZE00100082	7.70F


- or by using sql

In [391]:
df.createOrReplaceTempView('df')

In [392]:
query = '''
select *
from df
'''
result = spark.sql(query)
result.show()


+-----------+--------+------------+-----------+
|  stationID|    date|measure_type|temperature|
+-----------+--------+------------+-----------+
|ITE00100554|18000101|        TMAX|      -75.0|
|ITE00100554|18000101|        TMIN|     -148.0|
|GM000010962|18000101|        PRCP|        0.0|
|EZE00100082|18000101|        TMAX|      -86.0|
|EZE00100082|18000101|        TMIN|     -135.0|
|ITE00100554|18000102|        TMAX|      -60.0|
|ITE00100554|18000102|        TMIN|     -125.0|
|GM000010962|18000102|        PRCP|        0.0|
|EZE00100082|18000102|        TMAX|      -44.0|
|EZE00100082|18000102|        TMIN|     -130.0|
|ITE00100554|18000103|        TMAX|      -23.0|
|ITE00100554|18000103|        TMIN|      -46.0|
|GM000010962|18000103|        PRCP|        4.0|
|EZE00100082|18000103|        TMAX|      -10.0|
|EZE00100082|18000103|        TMIN|      -73.0|
|ITE00100554|18000104|        TMAX|        0.0|
|ITE00100554|18000104|        TMIN|      -13.0|
|GM000010962|18000104|        PRCP|     

In [396]:
query = '''
select 
    min(temperature) as min_temp,
    stationID
from df
group by stationID
'''

result = spark.sql(query)
result.show()

+--------+-----------+
|min_temp|  stationID|
+--------+-----------+
|  -148.0|ITE00100554|
|     0.0|GM000010962|
|  -135.0|EZE00100082|
+--------+-----------+



## Find the most popular Movie

In [3]:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType

# Create a Spark Session
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# Create schema when reading u.data
schema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])

# Load up movie data as dataframe
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("u.data")
moviesDF.printSchema()  

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: long (nullable = true)



- By Using RDD Commands

In [2]:
# Some SQL-style magic to sort all movies by popularity in one line!
topMovieIDs = moviesDF.groupBy("movieID").count().orderBy(func.desc("count"))

# Grab the top 10
topMovieIDs.show(10)

# Stop the session
spark.stop()

+-------+-----+
|movieID|count|
+-------+-----+
|     50|  583|
|    258|  509|
|    100|  508|
|    181|  507|
|    294|  485|
|    286|  481|
|    288|  478|
|      1|  452|
|    300|  431|
|    121|  429|
+-------+-----+
only showing top 10 rows



- Now lets do the same in sql:

In [4]:
moviesDF.createOrReplaceTempView('moviesDF')

In [6]:
query = '''
select 
    movieID,
    count(*) as count
from moviesDF
group by movieID
order by count(*) desc
'''

df = spark.sql(query)
df.show()

+-------+-----+
|movieID|count|
+-------+-----+
|     50|  583|
|    258|  509|
|    100|  508|
|    181|  507|
|    294|  485|
|    286|  481|
|    288|  478|
|      1|  452|
|    300|  431|
|    121|  429|
|    174|  420|
|    127|  413|
|     56|  394|
|      7|  392|
|     98|  390|
|    237|  384|
|    117|  378|
|    172|  367|
|    222|  365|
|    204|  350|
+-------+-----+
only showing top 20 rows



## Find the most popular Movie + Broadcasting

In [7]:
# -*- coding: utf-8 -*-
"""
Created on Mon Sep  7 15:28:00 2020

@author: Frank
"""

from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

# Creating a dictionary object
def loadMovieNames():
    movieNames = {}
    # CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
    with codecs.open("u.item", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# Creating a spark session
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# Broadcasting the dictionary to the Spark Driver
nameDict = spark.sparkContext.broadcast(loadMovieNames())

# Create schema when reading u.data
schema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])


# Load up movie data as dataframe
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("u.data")
movieCounts = moviesDF.groupBy("movieID").count()


# Create a user-defined function to look up movie names from our broadcasted dictionary
def lookupName(movieID):
    return nameDict.value[movieID]

# Retrieve the movie names from the broadcasted dictionary
lookupNameUDF = func.udf(lookupName)

# Add a movieTitle column using our new udf
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

# Sort the results
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))

# Grab the top 10
sortedMoviesWithNames.show(10, False)

# Stop the session
spark.stop()


+-------+-----+-----------------------------+
|movieID|count|movieTitle                   |
+-------+-----+-----------------------------+
|50     |583  |Star Wars (1977)             |
|258    |509  |Contact (1997)               |
|100    |508  |Fargo (1996)                 |
|181    |507  |Return of the Jedi (1983)    |
|294    |485  |Liar Liar (1997)             |
|286    |481  |English Patient, The (1996)  |
|288    |478  |Scream (1996)                |
|1      |452  |Toy Story (1995)             |
|300    |431  |Air Force One (1997)         |
|121    |429  |Independence Day (ID4) (1996)|
+-------+-----+-----------------------------+
only showing top 10 rows



## Superhero Social Graph1 - Find the most popular superhero

In [58]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("MostPopularSuperhero").getOrCreate()

schema = StructType([ \
                     StructField("id", IntegerType(), True), \
                     StructField("name", StringType(), True)])

names = spark.read.schema(schema).option("sep", " ").csv("marvel_names.txt")

lines = spark.read.text("marvel_graph.txt")

# Small tweak vs. what's shown in the video: we trim each line of whitespace as that could
# throw off the counts.
connections = lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
    .withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1) \
    .groupBy("id").agg(func.sum("connections").alias("connections"))
    
mostPopular = connections.sort(func.col("connections").desc()).first()

mostPopularName = names.filter(func.col("id") == mostPopular[0]).select("name").first()

print(mostPopularName[0] + " is the most popular superhero with " + str(mostPopular[1]) + " co-appearances.")



CAPTAIN AMERICA is the most popular superhero with 1933 co-appearances.


## Superhero Social Graph2 - Find the least popular superhero

In [63]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("MostObscureSuperheroes").getOrCreate()

schema = StructType([ \
                     StructField("id", IntegerType(), True), \
                     StructField("name", StringType(), True)])

names = spark.read.schema(schema).option("sep", " ").csv("marvel_names.txt")

lines = spark.read.text("marvel_graph.txt")

# Small tweak vs. what's shown in the video: we trim whitespace from each line as this
# could throw the counts off by one.
connections = lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
    .withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1) \
    .groupBy("id").agg(func.sum("connections").alias("connections"))
    
minConnectionCount = connections.agg(func.min("connections")).first()[0]

minConnections = connections.filter(func.col("connections") == minConnectionCount)

minConnectionsWithNames = minConnections.join(names, "id")

print("The following characters have only " + str(minConnectionCount) + " connection(s):")

minConnectionsWithNames.select("name").show()

The following characters have only 0 connection(s):
+--------------------+
|                name|
+--------------------+
|        BERSERKER II|
|              BLARE/|
|MARVEL BOY II/MARTIN|
|MARVEL BOY/MARTIN BU|
|      GIURESCU, RADU|
|       CLUMSY FOULUP|
|              FENRIS|
|              RANDAK|
|           SHARKSKIN|
|     CALLAHAN, DANNY|
|         DEATHCHARGE|
|                RUNE|
|         SEA LEOPARD|
|         RED WOLF II|
|              ZANTOR|
|JOHNSON, LYNDON BAIN|
|          LUNATIK II|
|                KULL|
|GERVASE, LADY ALYSSA|
+--------------------+



## Superhero Social Graph3 - Find the degrees of connections + Accumelators

In [1]:
#Boilerplate stuff:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("DegreesOfSeparation")
sc = SparkContext(conf = conf)

# The characters we wish to find the degree of separation between:
startCharacterID = 5306 #SpiderMan
targetCharacterID = 14  #ADAM 3,031 (who?)

# Our accumulator, used to signal when we find the target character during
# our BFS traversal.
hitCounter = sc.accumulator(0)

def convertToBFS(line):
    fields = line.split()
    heroID = int(fields[0])
    connections = []
    for connection in fields[1:]:
        connections.append(int(connection))

    color = 'WHITE'
    distance = 9999

    if (heroID == startCharacterID):
        color = 'GRAY'
        distance = 0

    return (heroID, (connections, distance, color))


def createStartingRdd():
    inputFile = sc.textFile("marvel_graph.txt")
    return inputFile.map(convertToBFS)

def bfsMap(node):
    characterID = node[0]
    data = node[1]
    connections = data[0]
    distance = data[1]
    color = data[2]

    results = []

    #If this node needs to be expanded...
    if (color == 'GRAY'):
        for connection in connections:
            newCharacterID = connection
            newDistance = distance + 1
            newColor = 'GRAY'
            if (targetCharacterID == connection):
                hitCounter.add(1)

            newEntry = (newCharacterID, ([], newDistance, newColor))
            results.append(newEntry)

        #We've processed this node, so color it black
        color = 'BLACK'

    #Emit the input node so we don't lose it.
    results.append( (characterID, (connections, distance, color)) )
    return results

def bfsReduce(data1, data2):
    edges1 = data1[0]
    edges2 = data2[0]
    distance1 = data1[1]
    distance2 = data2[1]
    color1 = data1[2]
    color2 = data2[2]

    distance = 9999
    color = color1
    edges = []

    # See if one is the original node with its connections.
    # If so preserve them.
    if (len(edges1) > 0):
        edges.extend(edges1)
    if (len(edges2) > 0):
        edges.extend(edges2)

    # Preserve minimum distance
    if (distance1 < distance):
        distance = distance1

    if (distance2 < distance):
        distance = distance2

    # Preserve darkest color
    if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')):
        color = color2

    if (color1 == 'GRAY' and color2 == 'BLACK'):
        color = color2

    if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')):
        color = color1

    if (color2 == 'GRAY' and color1 == 'BLACK'):
        color = color1

    return (edges, distance, color)


#Main program here:
iterationRdd = createStartingRdd()

for iteration in range(0, 10):
    print("Running BFS iteration# " + str(iteration+1))

    # Create new vertices as needed to darken or reduce distances in the
    # reduce stage. If we encounter the node we're looking for as a GRAY
    # node, increment our accumulator to signal that we're done.
    mapped = iterationRdd.flatMap(bfsMap)

    # Note that mapped.count() action here forces the RDD to be evaluated, and
    # that's the only reason our accumulator is actually updated.
    print("Processing " + str(mapped.count()) + " values.")

    if (hitCounter.value > 0):
        print("Hit the target character! From " + str(hitCounter.value) \
            + " different direction(s).")
        break

    # Reducer combines data for each character ID, preserving the darkest
    # color and shortest path.
    iterationRdd = mapped.reduceByKey(bfsReduce)


Running BFS iteration# 1
Processing 8330 values.
Running BFS iteration# 2
Processing 220615 values.
Hit the target character! From 1 different direction(s).


## Recommend similar movies based on ratings + Cache

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
import sys

def computeCosineSimilarity(spark, data):
    # Compute xx, xy and yy columns
    pairScores = data \
      .withColumn("xx", func.col("rating1") * func.col("rating1")) \
      .withColumn("yy", func.col("rating2") * func.col("rating2")) \
      .withColumn("xy", func.col("rating1") * func.col("rating2")) 

    # Compute numerator, denominator and numPairs columns
    calculateSimilarity = pairScores \
      .groupBy("movie1", "movie2") \
      .agg( \
        func.sum(func.col("xy")).alias("numerator"), \
        (func.sqrt(func.sum(func.col("xx"))) * func.sqrt(func.sum(func.col("yy")))).alias("denominator"), \
        func.count(func.col("xy")).alias("numPairs")
      )

    # Calculate score and select only needed columns (movie1, movie2, score, numPairs)
    result = calculateSimilarity \
      .withColumn("score", \
        func.when(func.col("denominator") != 0, func.col("numerator") / func.col("denominator")) \
          .otherwise(0) \
      ).select("movie1", "movie2", "score", "numPairs")

    return result

# Get movie name by given movie id 
def getMovieName(movieNames, movieId):
    result = movieNames.filter(func.col("movieID") == movieId) \
        .select("movieTitle").collect()[0]

    return result[0]


spark = SparkSession.builder.appName("MovieSimilarities").master("local[*]").getOrCreate()

movieNamesSchema = StructType([ \
                               StructField("movieID", IntegerType(), True), \
                               StructField("movieTitle", StringType(), True) \
                               ])
    
moviesSchema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])
    
    
# Create a broadcast dataset of movieID and movieTitle.
# Apply ISO-885901 charset
movieNames = spark.read \
      .option("sep", "|") \
      .option("charset", "ISO-8859-1") \
      .schema(movieNamesSchema) \
      .csv("u.item")

# Load up movie data as dataset
movies = spark.read \
      .option("sep", "\t") \
      .schema(moviesSchema) \
      .csv("u.data")


ratings = movies.select("userId", "movieId", "rating")

# Emit every movie rated together by the same user.
# Self-join to find every combination.
# Select movie pairs and rating pairs
moviePairs = ratings.alias("ratings1") \
      .join(ratings.alias("ratings2"), (func.col("ratings1.userId") == func.col("ratings2.userId")) \
            & (func.col("ratings1.movieId") < func.col("ratings2.movieId"))) \
      .select(func.col("ratings1.movieId").alias("movie1"), \
        func.col("ratings2.movieId").alias("movie2"), \
        func.col("ratings1.rating").alias("rating1"), \
        func.col("ratings2.rating").alias("rating2"))


moviePairSimilarities = computeCosineSimilarity(spark, moviePairs).cache()

if (len(sys.argv) > 1):
    scoreThreshold = 0.97
    coOccurrenceThreshold = 50.0

    movieID = sys.argv[1]

    # Filter for movies with this sim that are "good" as defined by
    # our quality thresholds above
    filteredResults = moviePairSimilarities.filter( \
        ((func.col("movie1") == movieID) | (func.col("movie2") == movieID)) & \
          (func.col("score") > scoreThreshold) & (func.col("numPairs") > coOccurrenceThreshold))

    # Sort by quality score.
    results = filteredResults.sort(func.col("score").desc()).take(10)
    
    print ("Top 10 similar movies for " + getMovieName(movieNames, movieID))
    
    for result in results:
        # Display the similarity result that isn't the movie we're looking at
        similarMovieID = result.movie1
        if (similarMovieID == movieID):
            similarMovieID = result.movie2
        
        print(getMovieName(movieNames, similarMovieID) + "\tscore: " \
              + str(result.score) + "\tstrength: " + str(result.numPairs))
        


IndexError: list index out of range

## Recommend similar movies based on ratings + Partitioning and running on clusters

In [None]:
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt

def loadMovieNames():
    movieNames = {}
    with open("movies.dat",  encoding='ascii', errors='ignore') as f:
        for line in f:
            fields = line.split("::")
            movieNames[int(fields[0])] = fields[1]
    return movieNames

def makePairs( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

def filterDuplicates( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)


conf = SparkConf()
sc = SparkContext(conf = conf)

print("\nLoading movie names...")
nameDict = loadMovieNames()

data = sc.textFile("s3n://sundog-spark/ml-1m/ratings.dat")

# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split("::")).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

# Emit every movie rated together by the same user.
# Self-join to find every combination.
ratingsPartitioned = ratings.partitionBy(100)
joinedRatings = ratingsPartitioned.join(ratingsPartitioned)

# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs).partitionBy(100)

# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()

# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).persist()

# Save the results if desired
moviePairSimilarities.sortByKey()
moviePairSimilarities.saveAsTextFile("movie-sims")

# Extract similarities for the movie we care about that are "good".
if (len(sys.argv) > 1):

    scoreThreshold = 0.97
    coOccurenceThreshold = 50

    movieID = int(sys.argv[1])

    # Filter for movies with this sim that are "good" as defined by
    # our quality thresholds above
    filteredResults = moviePairSimilarities.filter(lambda pairSim: \
        (pairSim[0][0] == movieID or pairSim[0][1] == movieID) \
        and pairSim[1][0] > scoreThreshold and pairSim[1][1] > coOccurenceThreshold)

    # Sort by quality score.
    results = filteredResults.map(lambda pairSim: (pairSim[1], pairSim[0])).sortByKey(ascending = False).take(10)

    print("Top 10 similar movies for " + nameDict[movieID])
    for result in results:
        (sim, pair) = result
        # Display the similarity result that isn't the movie we're looking at
        similarMovieID = pair[0]
        if (similarMovieID == movieID):
            similarMovieID = pair[1]
        print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))


In [11]:
sc

# <ins>__SparkML__<ins>

## Examples

### Recommendation engine

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
from pyspark.ml.recommendation import ALS
import sys
import codecs

def loadMovieNames():
    movieNames = {}
    # CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
    with codecs.open("u.ITEM", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames


spark = SparkSession.builder.appName("ALSExample").getOrCreate()
    
moviesSchema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])
    
names = loadMovieNames()
    
ratings = spark.read.option("sep", "\t").schema(moviesSchema) \
    .csv("u.data")
    
print("Training recommendation model...")

als = ALS().setMaxIter(5).setRegParam(0.01).setUserCol("userID").setItemCol("movieID") \
    .setRatingCol("rating")
    
model = als.fit(ratings)

# Manually construct a dataframe of the user ID's we want recs for
userID = int(sys.argv[1])
userSchema = StructType([StructField("userID", IntegerType(), True)])
users = spark.createDataFrame([[userID,]], userSchema)

recommendations = model.recommendForUserSubset(users, 10).collect()

print("Top 10 recommendations for user ID " + str(userID))

for userRecs in recommendations:
    myRecs = userRecs[1]  #userRecs is (userID, [Row(movieId, rating), Row(movieID, rating)...])
    for rec in myRecs: #my Recs is just the column of recs for the user
        movie = rec[0] #For each rec in the list, extract the movie ID and rating
        rating = rec[1]
        movieName = names[movie]
        print(movieName + str(rating))
        



Training recommendation model...


ValueError: invalid literal for int() with base 10: '-f'

### Linear Regression

In [13]:
from __future__ import print_function

from pyspark.ml.regression import LinearRegression

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors

if __name__ == "__main__":

    # Create a SparkSession (Note, the config section is only for Windows!)
    spark = SparkSession.builder.appName("LinearRegression").getOrCreate()

    # Load up our data and convert it to the format MLLib expects.
    inputLines = spark.sparkContext.textFile("regression.txt")
    data = inputLines.map(lambda x: x.split(",")).map(lambda x: (float(x[0]), Vectors.dense(float(x[1]))))

    # Convert this RDD to a DataFrame
    colNames = ["label", "features"]
    df = data.toDF(colNames)

    # Note, there are lots of cases where you can avoid going from an RDD to a DataFrame.
    # Perhaps you're importing data from a real database. Or you are using structured streaming
    # to get your data.

    # Let's split our data into training data and testing data
    trainTest = df.randomSplit([0.5, 0.5])
    trainingDF = trainTest[0]
    testDF = trainTest[1]

    # Now create our linear regression model
    lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

    # Train the model using our training data
    model = lir.fit(trainingDF)

    # Now see if we can predict values in our test data.
    # Generate predictions using our linear regression model for all features in our
    # test dataframe:
    fullPredictions = model.transform(testDF).cache()

    # Extract the predictions and the "known" correct labels.
    predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
    labels = fullPredictions.select("label").rdd.map(lambda x: x[0])

    # Zip them together
    predictionAndLabel = predictions.zip(labels).collect()

    # Print out the predicted and actual values for each point
    for prediction in predictionAndLabel:
      print(prediction)


    # Stop the session
    spark.stop()


(-2.4404683061101764, -3.54)
(-2.3125552232173225, -3.23)
(-2.0496227750486797, -2.89)
(-1.8222217387947177, -2.58)
(-1.6943086559018645, -2.54)
(-1.6587772439871826, -2.43)
(-1.7227337854336098, -2.36)
(-1.5521830082431383, -2.27)
(-1.523757878711393, -2.22)
(-1.4029510782014758, -1.96)
(-1.3745259486697305, -1.94)
(-1.2963568424574312, -1.91)
(-1.3176756896062403, -1.91)
(-1.381632231052667, -1.91)
(-1.4029510782014758, -1.87)
(-1.1897626067133866, -1.83)
(-1.3105694072233038, -1.82)
(-1.3034631248403676, -1.8)
(-1.225294018628068, -1.79)
(-1.175550041947514, -1.74)
(-1.1115935005010873, -1.68)
(-1.033424394288788, -1.67)
(-1.1400186300328325, -1.66)
(-1.1613374771816414, -1.66)
(-1.2110814538621955, -1.66)
(-1.3034631248403676, -1.64)
(-1.2181877362451319, -1.61)
(-1.0902746533522785, -1.6)
(-1.12580606526696, -1.59)
(-1.147124912415769, -1.59)
(-1.1684437595645776, -1.58)
(-1.012105547139979, -1.57)
(-1.0547432414375968, -1.54)
(-0.9765741352252976, -1.5)
(-0.9765741352252976, -1.4

### Decision Trees

In [17]:
from __future__ import print_function

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

if __name__ == "__main__":

    # Create a SparkSession (Note, the config section is only for Windows!)
    spark = SparkSession.builder.appName("DecisionTree").getOrCreate()

    
    # Load up data as dataframe
    data = spark.read.option("header", "true").option("inferSchema", "true")\
        .csv("realestate.csv")

    assembler = VectorAssembler().setInputCols(["HouseAge", "DistanceToMRT", \
                               "NumberConvenienceStores"]).setOutputCol("features")
    
    df = assembler.transform(data).select("PriceOfUnitArea", "features")

    # Let's split our data into training data and testing data
    trainTest = df.randomSplit([0.5, 0.5])
    trainingDF = trainTest[0]
    testDF = trainTest[1]

    # Now create our decision tree
    dtr = DecisionTreeRegressor().setFeaturesCol("features").setLabelCol("PriceOfUnitArea")

    # Train the model using our training data
    model = dtr.fit(trainingDF)

    # Now see if we can predict values in our test data.
    # Generate predictions using our decision tree model for all features in our
    # test dataframe:
    fullPredictions = model.transform(testDF).cache()

    # Extract the predictions and the "known" correct labels.
    predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
    labels = fullPredictions.select("PriceOfUnitArea").rdd.map(lambda x: x[0])

    # Zip them together
    predictionAndLabel = predictions.zip(labels).collect()

    # Print out the predicted and actual values for each point
    for prediction in predictionAndLabel:
        print(prediction)


    # Stop the session
    spark.stop()


(39.68108108108107, 7.6)
(19.200000000000003, 11.2)
(17.73333333333333, 12.2)
(17.216666666666665, 12.8)
(23.9962962962963, 12.8)
(20.96, 12.9)
(15.3, 13.4)
(23.9962962962963, 13.7)
(23.9962962962963, 13.8)
(15.3, 14.4)
(17.216666666666665, 14.7)
(17.216666666666665, 15.4)
(17.73333333333333, 15.5)
(17.216666666666665, 15.9)
(23.9962962962963, 16.1)
(17.73333333333333, 17.4)
(17.216666666666665, 18.6)
(17.73333333333333, 18.8)
(23.9962962962963, 19.0)
(18.25, 20.7)
(29.49230769230769, 21.3)
(23.9962962962963, 21.5)
(29.49230769230769, 21.7)
(23.9962962962963, 21.8)
(15.3, 22.1)
(20.96, 22.3)
(29.500000000000014, 22.8)
(23.9962962962963, 22.9)
(29.49230769230769, 23.1)
(23.9962962962963, 23.2)
(29.49230769230769, 23.5)
(23.9962962962963, 23.7)
(33.53333333333334, 23.9)
(23.9962962962963, 24.7)
(17.73333333333333, 24.7)
(23.9962962962963, 24.8)
(29.49230769230769, 25.3)
(22.250000000000004, 25.3)
(23.9962962962963, 25.7)
(39.68108108108107, 26.5)
(39.68108108108107, 26.5)
(23.99629629629

# <ins>__Spark Streaming__<ins>

## Examples

### Streaming example1

In [26]:
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 18 09:15:05 2019

@author: Frank
"""

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession

from pyspark.sql.functions import regexp_extract

# Create a SparkSession (the config bit is only for Windows!)
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/tmp").appName("StructuredStreaming").getOrCreate()

# Monitor the logs directory for new log data, and read in the raw lines as accessLines
accessLines = spark.readStream.text("logs")

# Parse out the common log format to a DataFrame
contentSizeExp = r'\s(\d+)$'
statusExp = r'\s(\d{3})\s'
generalExp = r'\"(\S+)\s(\S+)\s*(\S*)\"'
timeExp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
hostExp = r'(^\S+\.[\S+\.]+\S+)\s'

logsDF = accessLines.select(regexp_extract('value', hostExp, 1).alias('host'),
                         regexp_extract('value', timeExp, 1).alias('timestamp'),
                         regexp_extract('value', generalExp, 1).alias('method'),
                         regexp_extract('value', generalExp, 2).alias('endpoint'),
                         regexp_extract('value', generalExp, 3).alias('protocol'),
                         regexp_extract('value', statusExp, 1).cast('integer').alias('status'),
                         regexp_extract('value', contentSizeExp, 1).cast('integer').alias('content_size'))

# Keep a running count of every access by status code
statusCountsDF = logsDF.groupBy(logsDF.status).count()

# Kick off our streaming query, dumping results to the console
query = ( statusCountsDF.writeStream.outputMode("complete").format("console").queryName("counts").start() )

# Run forever until terminated
query.awaitTermination()

# Cleanly shut down the session
spark.stop()



Py4JJavaError: An error occurred while calling o1277.text.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225)
	at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66)
	at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:567)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$sourceSchema$2(DataSource.scala:268)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:164)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$1(DataSource.scala:164)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:262)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34)
	at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:196)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:210)
	at org.apache.spark.sql.streaming.DataStreamReader.text(DataStreamReader.scala:343)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


### Streaming example2

In [27]:
from pyspark.sql import SparkSession

import pyspark.sql.functions as func

# Create a SparkSession (the config bit is only for Windows!)
spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()

# Monitor the logs directory for new log data, and read in the raw lines as accessLines
accessLines = spark.readStream.text("logs")

# Parse out the common log format to a DataFrame
contentSizeExp = r'\s(\d+)$'
statusExp = r'\s(\d{3})\s'
generalExp = r'\"(\S+)\s(\S+)\s*(\S*)\"'
timeExp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
hostExp = r'(^\S+\.[\S+\.]+\S+)\s'

logsDF = accessLines.select(func.regexp_extract('value', hostExp, 1).alias('host'),
                         func.regexp_extract('value', timeExp, 1).alias('timestamp'),
                         func.regexp_extract('value', generalExp, 1).alias('method'),
                         func.regexp_extract('value', generalExp, 2).alias('endpoint'),
                         func.regexp_extract('value', generalExp, 3).alias('protocol'),
                         func.regexp_extract('value', statusExp, 1).cast('integer').alias('status'),
                         func.regexp_extract('value', contentSizeExp, 1).cast('integer').alias('content_size'))

logsDF2 = logsDF.withColumn("eventTime", func.current_timestamp())

# Keep a running count of endpoints
endpointCounts = logsDF2.groupBy(func.window(func.col("eventTime"), \
      "30 seconds", "10 seconds"), func.col("endpoint")).count()

sortedEndpointCounts = endpointCounts.orderBy(func.col("count").desc())

# Display the stream to the console
query = sortedEndpointCounts.writeStream.outputMode("complete").format("console") \
      .queryName("counts").start()

# Wait until we terminate the scripts
query.awaitTermination()

# Stop the session
spark.stop()



Py4JJavaError: An error occurred while calling o1282.text.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225)
	at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66)
	at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:567)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$sourceSchema$2(DataSource.scala:268)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:164)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$1(DataSource.scala:164)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:262)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34)
	at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:196)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:210)
	at org.apache.spark.sql.streaming.DataStreamReader.text(DataStreamReader.scala:343)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


# <ins>__Coursera - SparkSQL__<ins>

## Objectives


Spark SQL is a Spark module for structured data processing. It is sed to query structured data inside Spark programs, using either SQL or a familiar DataFrame API.

After completing this lab you will be able to:


*   Load a data file into a dataframe
*   Create a Table View for the dataframe
*   Run basic SQL queries and aggregate data on the table view
*   Create a Pandas UDF to perform columnar operations


***


## Setup


For this lab, we are going to be using Python and Spark (PySpark). These libraries should be installed in your lab environment or in SN Labs. Pandas is a popular data science package for Python. In this lab, we use Pandas to load a CSV file from disc to a pandas dataframe in memory. PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context.


In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pyarrow==0.14.1
!pip install pandas
!pip install numpy==1.19.5



ERROR: Could not find a version that satisfies the requirement pyarrow==0.14.1 (from versions: 0.9.0, 0.10.0, 0.11.0, 0.11.1, 0.12.0, 0.12.1, 0.13.0, 0.14.0, 0.15.1, 0.16.0, 0.17.0, 0.17.1, 1.0.0, 1.0.1, 2.0.0, 3.0.0, 4.0.0, 4.0.1, 5.0.0, 6.0.0, 6.0.1)
ERROR: No matching distribution found for pyarrow==0.14.1


Collecting numpy==1.19.5
  Downloading numpy-1.19.5-cp38-cp38-win_amd64.whl (13.3 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.19.2
    Uninstalling numpy-1.19.2:
      Successfully uninstalled numpy-1.19.2
Successfully installed numpy-1.19.5


In [None]:
import findspark
findspark.init()

In [6]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Exercise 1 -  Spark session


Create and initialize the Spark session needed to load the data frames and operate on it


#### Task 1: Creating the spark session and context


In [7]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

RuntimeError: Java gateway process exited before sending its port number

#### Task 2: Initialize Spark session

To work with dataframes we just need to verify that the spark session instance has been created.


In [3]:
spark

## Exercise 2 - Loading the Data and creating a table view


In this section, you will first read the CSV file into a Pandas Dataframe and then read it into a Spark Dataframe
Pandas is a library used for data manipulation and analysis. The Pandas library offers data structures and operations for creating and manipulating Data Series and DataFrame objects. Data can be imported from various data sources, e.g., Numpy arrays, Python dictionaries, and CSV files. Pandas allows you to manipulate, organize and display the data.

To create a Spark DataFrame we load an external DataFrame, called `mtcars`. This DataFrame includes 32 observations on 11 variables:

| colIndex | colName | units/description                        |
| :------: | :------ | :--------------------------------------- |
|   [, 1]  | mpg     | Miles per gallon                         |
|   [, 2]  | cyl     | Number of cylinders                      |
|   [, 3]  | disp    | Displacement (cu.in.)                    |
|   [, 4]  | hp      | Gross horsepower                         |
|   [, 5]  | drat    | Rear axle ratio                          |
|   [, 6]  | wt      | Weight (lb/1000)                         |
|   [, 7]  | qsec    | 1/4 mile time                            |
|   [, 8]  | vs      | V/S                                      |
|   [, 9]  | am      | Transmission (0 = automatic, 1 = manual) |
|   [,10]  | gear    | Number of forward gears                  |
|   [,11]  | carb    | Number of carburetors                    |


#### Task 1: Load data into a Pandas DataFrame.

Pandas has a convenient function to load CSV data from a URL directly into a pandas dataframe.


In [4]:
# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

In [5]:
# Preview a few records
mtcars.head()

Unnamed: 0.1,Unnamed: 0,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Mazda RX4,21.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4
1,Mazda RX4 Wag,21.0,6,160.0,110,3.9,2.875,17.02,0,1,4,4
2,Datsun 710,22.8,4,108.0,93,3.85,2.32,18.61,1,1,4,1
3,Hornet 4 Drive,21.4,6,258.0,110,3.08,3.215,19.44,1,0,3,1
4,Hornet Sportabout,18.7,8,360.0,175,3.15,3.44,17.02,0,0,3,2


In [6]:
mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )

#### Task 2: Loading data into a Spark DataFrame


We use the `createDataFrame` function to load the data into a spark dataframe


In [7]:
sdf = spark.createDataFrame(mtcars) 

Let us look at the schema of the loaded spark dataframe


In [8]:
sdf.printSchema()

root
 |-- name: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: long (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: long (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: long (nullable = true)
 |-- am: long (nullable = true)
 |-- gear: long (nullable = true)
 |-- carb: long (nullable = true)



#### Task 3: Create a Table View

Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the `createTempView()` function


In [9]:
sdf.createTempView("cars")

## Exercise 3 - Running SQL queries and aggregating data


Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly.


In [10]:
# Showing the whole table
spark.sql("SELECT * FROM cars").show()

+-------------------+----+---+-----+---+----+------------------+-----+---+---+----+----+
|               name| mpg|cyl| disp| hp|drat|                wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+------------------+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9|              2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|             2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85|              2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|             3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15|              3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76|              3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21|              3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69|              3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|

In [11]:
# Showing a specific column
spark.sql("SELECT mpg FROM cars").show(5)

+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
|21.4|
|18.7|
+----+
only showing top 5 rows



In [12]:
# Basic filtering query to determine cars that have a high mileage and low cylinder count
spark.sql("SELECT * FROM cars where mpg>20 AND cyl < 6").show(5)

+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
|       name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|  Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|   Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|   Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [13]:
# Aggregating data and grouping by cylinders
spark.sql("SELECT count(*), cyl from cars GROUP BY cyl").show()

+--------+---+
|count(1)|cyl|
+--------+---+
|       7|  6|
|      14|  8|
|      11|  4|
+--------+---+



## Exercise 4 - Create a Pandas UDF to apply a columnar operation

Apache Spark has become the de-facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions (UDF). These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.

Pandas UDFs built on top of Apache Arrow bring you the *best of both worlds*—the ability to define low-overhead, high-performance UDFs entirely in Python. In this simple example, we will build a Scalar Pandas UDF to convert the wT column from imperial units (1000-lbs) to metric units (metric tons).

In addition, UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the `@pandas_udf()` decorator. We can then apply this UDF to our `wt` column.


#### Task 1: Importing libraries and registering a UDF


In [14]:
# import the Pandas UDF function 
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [15]:
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)

<function __main__.convert_wt(s: pandas.core.series.Series) -> pandas.core.series.Series>

#### Task 2: Applying the UDF to the tableview

We can now apply the `convert_weight` user-defined-function to our `wt` column from the `cars` table view. This is done very simply using the SQL query shown below. In this example below we show both the original weight (in ton-lbs) and converted weight (in metric tons).


In [16]:
spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()

+-------------------+----+---+-----+---+----+------------------+-----+---+---+----+----+------------------+-------------+
|               name| mpg|cyl| disp| hp|drat|                wt| qsec| vs| am|gear|carb|   weight_imperial|weight_metric|
+-------------------+----+---+-----+---+----+------------------+-----+---+---+----+----+------------------+-------------+
|          Mazda RX4|21.0|  6|160.0|110| 3.9|              2.62|16.46|  0|  1|   4|   4|              2.62|        1.179|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|             2.875|17.02|  0|  1|   4|   4|             2.875|      1.29375|
|         Datsun 710|22.8|  4|108.0| 93|3.85|              2.32|18.61|  1|  1|   4|   1|              2.32|        1.044|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|             3.215|19.44|  1|  0|   3|   1|             3.215|      1.44675|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15|              3.44|17.02|  0|  0|   3|   2|              3.44|        1.548|
|            Valiant|18.

### Practice Questions


### Question 1 - Basic SQL operations


Display all Mercedez car rows from the `cars` table view we created earlier. The Mercedez cars have the prefix "Merc" in the car name column.


In [24]:
# Code block for learners to answer

Double-click **here** for a hint.

<!-- The hint is below:

The SQL query word `like` is used to identify patterns. 

-->


Double-click **here** for the solution.

<!-- The answer is below:

spark.sql("SELECT * FROM cars where name like 'Merc%'").show()

-->


### Question 2 - User Defined Functions


In this notebook, we created a UDF to convert weight from imperial to metric units. Now for this exercise, please create a pandas UDF to convert the `mpg` column to `kmpl` (kilometers per liter). You can use the conversion factor of 0.425.


In [None]:
# Code block for learners to answer

Double-click **here** for the solution.

<!-- The answer is below:

@pandas_udf("float")
def convert_mileage(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.425

spark.udf.register("convert_mileage", convert_mileage)

spark.sql("SELECT *, mpg AS mpg, convert_weight(mpg) as kmpl FROM cars").show()
-->
