In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

def IntegerSafe(value): # In case there are non-integer type to be converted.
    try:
        return int(value)
    except ValueError:
        return None

ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

business = sc.textFile("../Data/filtered_registered_business_sf.csv")\
             .map(lambda x : x.split(','))\
             .map(lambda x : (IntegerSafe(x[0]), x[1], x[2], x[3], x[4]))
              

schema = StructType([ StructField("zip", IntegerType(), True),
                      StructField("name", StringType(), False),
                      StructField("street", StringType(), True),
                      StructField("city", StringType(), True),
                      StructField("state", StringType(), True)
                    ])

business_df = ss.createDataFrame(business, schema)


In [2]:
business_df.show(5)

+-----+--------------------+--------------------+-------------+-----+
|  zip|                name|              street|         city|state|
+-----+--------------------+--------------------+-------------+-----+
|94123|   Tournahu George L|   3301 Broderick St|San Francisco|   CA|
|94124|Stephens Institut...|    2225 Jerrold Ave|San Francisco|   CA|
|94105|Stephens Institut...|180 New Montgomer...|San Francisco|   CA|
|94108|Stephens Institut...|       540 Powell St|San Francisco|   CA|
|94107|Stephens Institut...|     460 Townsend St|San Francisco|   CA|
+-----+--------------------+--------------------+-------------+-----+
only showing top 5 rows



# print 5 zip code with the most businesses

In [3]:
business_df.groupBy('zip').count().orderBy('count', ascending =False).show(5)
                                                                          

+-----+-----+
|  zip|count|
+-----+-----+
|94110|12459|
|94103|10919|
|94109| 9623|
|94107| 9394|
|94102| 7962|
+-----+-----+
only showing top 5 rows



# Create a column named "onHoward" to see whether it is on Howard street

In [4]:
business_df.withColumn('onHoward', business_df['street'].contains('Howard')).filter('onHoward == True').show(5)

+-----+--------------------+--------------+-------------+-----+--------+
|  zip|                name|        street|         city|state|onHoward|
+-----+--------------------+--------------+-------------+-----+--------+
|94105|Stephens Institut...| 631 Howard St|San Francisco|   CA|    true|
|94103|Anderson Enterpri...|1525 Howard St|San Francisco|   CA|    true|
|94103|Avis Rent A Car S...| 821 Howard St|San Francisco|   CA|    true|
|94103|German Motors Cor...|1675 Howard St|San Francisco|   CA|    true|
|94103|German Motors Cor...|1675 Howard St|San Francisco|   CA|    true|
+-----+--------------------+--------------+-------------+-----+--------+
only showing top 5 rows



In [5]:
status = sc.textFile("../Data/status_million.csv")


In [6]:
from datetime import datetime
def toIntSafe(inval):
  try:
    return int(inval)
  except ValueError:
    return None

def toTimeSafe(inval):
  inval = inval.strip("\"") # Timestamp starting and ending with a double quotation mark.
  try:
    return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S")
  except ValueError:
    return None


In [7]:
schema = StructType([ StructField("station_id", IntegerType(), False),
                      StructField("num_bikes_available", IntegerType(), True),
                      StructField("num_docks_available", IntegerType(), True),
                      StructField("timestamp", TimestampType(), True)
                    ])


In [8]:
status_transformed = status.map(lambda x : x.split(","))\
                           .map(lambda x : (int(x[0]), toIntSafe(x[1]), toIntSafe(x[2]), toTimeSafe(x[3])))


In [9]:
status_transformed.partitionBy(10).cache()

MapPartitionsRDD[21] at mapPartitions at PythonRDD.scala:133

In [10]:
status_df = ss.createDataFrame(status_transformed, schema)

In [11]:
status_df.show(5)

+----------+-------------------+-------------------+-------------------+
|station_id|num_bikes_available|num_docks_available|          timestamp|
+----------+-------------------+-------------------+-------------------+
|        10|                  7|                  8|2014-12-30 15:37:02|
|        10|                  7|                  8|2014-12-30 15:35:02|
|        10|                  7|                  8|2014-12-30 15:34:02|
|        10|                  7|                  8|2014-12-30 15:33:02|
|        10|                  7|                  8|2014-12-30 15:32:02|
+----------+-------------------+-------------------+-------------------+
only showing top 5 rows



In [12]:
status_df.filter('station_id == 10').orderBy('timestamp').show(5)

+----------+-------------------+-------------------+-------------------+
|station_id|num_bikes_available|num_docks_available|          timestamp|
+----------+-------------------+-------------------+-------------------+
|        10|                  9|                  6|2014-09-01 00:00:03|
|        10|                  9|                  6|2014-09-01 00:01:02|
|        10|                  9|                  6|2014-09-01 00:02:02|
|        10|                  9|                  6|2014-09-01 00:03:03|
|        10|                  9|                  6|2014-09-01 00:04:02|
+----------+-------------------+-------------------+-------------------+
only showing top 5 rows



# Return the current and previous number of bike at station_id, 10 at each time data was collected order by timestamp.

In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *


In [14]:
status_df.filter('station_id == 10').\
        select('station_id', 'timestamp','num_bikes_available',lag('num_bikes_available', 1).\
               over(Window.partitionBy('station_id').orderBy('timestamp'))).show(5)
#                                             lag('num_bikes_available', 1).over(Window.partitionBy('station_id')\
#                                                                                .orderBy('timestamp')).show(5))

+----------+-------------------+-------------------+-----------------------------------------------------------------------------------------------------------------------+
|station_id|          timestamp|num_bikes_available|lag(num_bikes_available, 1, NULL) OVER (PARTITION BY station_id ORDER BY timestamp ASC NULLS FIRST unspecifiedframe$())|
+----------+-------------------+-------------------+-----------------------------------------------------------------------------------------------------------------------+
|        10|2014-09-01 00:00:03|                  9|                                                                                                                   null|
|        10|2014-09-01 00:01:02|                  9|                                                                                                                      9|
|        10|2014-09-01 00:02:02|                  9|                                                                                   

# Return the current and next number of bike at station_id, 10 at each time data was collected order by timestamp.


In [16]:
status_df.filter('station_id == 10').select('station_id', 'timestamp','num_bikes_available',\
                                            lead('num_bikes_available', 1).over(Window.partitionBy('station_id')\
                                                                               .orderBy('timestamp'))).show(5)

+----------+-------------------+-------------------+------------------------------------------------------------------------------------------------------------------------+
|station_id|          timestamp|num_bikes_available|lead(num_bikes_available, 1, NULL) OVER (PARTITION BY station_id ORDER BY timestamp ASC NULLS FIRST unspecifiedframe$())|
+----------+-------------------+-------------------+------------------------------------------------------------------------------------------------------------------------+
|        10|2014-09-01 00:00:03|                  9|                                                                                                                       9|
|        10|2014-09-01 00:01:02|                  9|                                                                                                                       9|
|        10|2014-09-01 00:02:02|                  9|                                                                              

# EXample 3

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

def IntegerSafe(value): # In case there are non-integer type to be converted.
    try:
        return int(value)
    except ValueError:
        return None

ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext


business = sc.textFile("../Data/filtered_registered_business_sf.csv")\
             .map(lambda x : x.split(','))\
             .map(lambda x : (IntegerSafe(x[0]), x[1], x[2], x[3], x[4]))
              

schema = StructType([ StructField("zip", IntegerType(), True),
                      StructField("name", StringType(), False),
                      StructField("street", StringType(), True),
                      StructField("city", StringType(), True),
                      StructField("state", StringType(), True)
                    ])

business_df = ss.createDataFrame(business, schema)

# Create a UDF called check_sf which checks whether a given value contains “San Francisco” or “SF”.

In [23]:
from pyspark.sql.functions import * 

In [24]:
check_sf = udf(lambda x: ('San Francisco' in x) or ('SF' in x), BooleanType())

In [26]:
business_df.select('name', 'city', check_sf('city')).show(5)

+--------------------+-------------+--------------+
|                name|         city|<lambda>(city)|
+--------------------+-------------+--------------+
|   Tournahu George L|San Francisco|          true|
|Stephens Institut...|San Francisco|          true|
|Stephens Institut...|San Francisco|          true|
|Stephens Institut...|San Francisco|          true|
|Stephens Institut...|San Francisco|          true|
+--------------------+-------------+--------------+
only showing top 5 rows



In [27]:
def sf(x):
    if (("San Francisco" in x) or ("SF" in x)):
        return True
    else :
        return False

In [30]:
check_sf_udf = udf(sf)

In [33]:
business_df.select('name', 'city', check_sf_udf('city').alias('San Francisco')).show(5)

+--------------------+-------------+-------------+
|                name|         city|San Francisco|
+--------------------+-------------+-------------+
|   Tournahu George L|San Francisco|         true|
|Stephens Institut...|San Francisco|         true|
|Stephens Institut...|San Francisco|         true|
|Stephens Institut...|San Francisco|         true|
|Stephens Institut...|San Francisco|         true|
+--------------------+-------------+-------------+
only showing top 5 rows



# Example 4 Aggregate Function


# Calculate min, max, average num_bike_available per station_id.

In [36]:
status_df.groupBy('station_id').min('num_bikes_available').show(5)

+----------+------------------------+
|station_id|min(num_bikes_available)|
+----------+------------------------+
|        10|                       0|
|        11|                       0|
+----------+------------------------+



In [40]:
status_df.groupBy('station_id').agg(min('num_bikes_available').alias('min_bikes'), max('num_bikes_available').alias('max_bikes'),\
                                    avg('num_bikes_available').alias('avg_bikes')).show(5)

+----------+---------+---------+-----------------+
|station_id|min_bikes|max_bikes|        avg_bikes|
+----------+---------+---------+-----------------+
|        10|        0|       15|5.931166125246599|
|        11|        0|       19|7.768450198057421|
+----------+---------+---------+-----------------+



In [41]:
# Example 5 Join Types

In [44]:
d = sc.parallelize([("Alice",18),("Bob",20), ("Tom",40)])


In [45]:
d2 = sc.parallelize([("Bob",85), (None,80)])

In [46]:
schema1 = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])

schema2 = StructType([
StructField("name", StringType(), True),
StructField("height", IntegerType(), True)])



In [48]:
df = ss.createDataFrame(d, schema1)
df2 = ss.createDataFrame(d2, schema2)

df.show()
df2.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 20|
|  Tom| 40|
+-----+---+

+----+------+
|name|height|
+----+------+
| Bob|    85|
|null|    80|
+----+------+



# Inner Join

In [52]:
df.join(df2, 'name', 'inner').show()

+----+---+------+
|name|age|height|
+----+---+------+
| Bob| 20|    85|
+----+---+------+



In [54]:
df.join(df2, 'name').show() # inner is default 

+----+---+------+
|name|age|height|
+----+---+------+
| Bob| 20|    85|
+----+---+------+



In [56]:
df.join(df2, df.name == df2.name).select(df.name, df2.height, df.age).show()

+----+------+---+
|name|height|age|
+----+------+---+
| Bob|    85| 20|
+----+------+---+



# Outer Join 

In [58]:
df.join(df2, 'name', 'outer').show()

+-----+----+------+
| name| age|height|
+-----+----+------+
|  Tom|  40|  null|
| null|null|    80|
|  Bob|  20|    85|
|Alice|  18|  null|
+-----+----+------+



In [62]:
# df.show()
df2.show()

+----+------+
|name|height|
+----+------+
| Bob|    85|
|null|    80|
+----+------+



# LeftOuter

In [64]:
df.join(df2, 'name', 'left_outer').show()

+-----+---+------+
| name|age|height|
+-----+---+------+
|  Tom| 40|  null|
|  Bob| 20|    85|
|Alice| 18|  null|
+-----+---+------+



# Right Outer 

In [66]:
df.join(df2,'name', 'right_outer').show()

+----+----+------+
|name| age|height|
+----+----+------+
|null|null|    80|
| Bob|  20|    85|
+----+----+------+



# Left Semi Join 

In [73]:
df.join(df2, 'name', 'leftsemi').show() 

+----+---+
|name|age|
+----+---+
| Bob| 20|
+----+---+



# Example 6 Apply Inner left_outer and right outer Joins supervisor and bussiness 

# inner joins

In [76]:
supervisor = sc.textFile("../Data/supervisor_sf.csv")\
            .map(lambda x: x.split(',')).map(lambda x: (IntegerSafe(x[0]), IntegerSafe(x[1])))
schema = StructType([StructField('zip', IntegerType(), False),
                        StructField('id', IntegerType(), False)])
supervisor_df = ss.createDataFrame(supervisor, schema)

In [77]:
business_df.join(supervisor_df, 'zip').show(5)

+-----+--------------------+--------------------+-------------+-----+---+
|  zip|                name|              street|         city|state| id|
+-----+--------------------+--------------------+-------------+-----+---+
|94109|Stephens Institut...|1835-49 Van Ness Ave|San Francisco|   CA|  2|
|94109|Stephens Institut...|1835-49 Van Ness Ave|San Francisco|   CA|  6|
|94109|Stephens Institut...|1835-49 Van Ness Ave|San Francisco|   CA|  3|
|94109|Stephens Institut...|1835-49 Van Ness Ave|San Francisco|   CA|  5|
|94109|Stephens Institut...|        1055 Pine St|San Francisco|   CA|  2|
+-----+--------------------+--------------------+-------------+-----+---+
only showing top 5 rows



# Left outer 

In [78]:
business_df.join(supervisor_df, 'zip','left_outer').show(5)

+-----+--------------------+-------------------+-----------+-----+----+
|  zip|                name|             street|       city|state|  id|
+-----+--------------------+-------------------+-----------+-----+----+
| 2142|        Trecler Alan|        1 Rogers St|  Cambridge|   MA|null|
| 2142|Aegerion Pharmace...|        101 Main St|  Cambridge|   MA|null|
| 8086|         Aramsco Inc| 1480 Grandview Ave|  Thorofare|   NJ|null|
|18051|Five Thousand For...|       8020 Mine St|Fogelsville|   PA|null|
|21220|Social Solutions ...|425 Williams Ct 100|  Baltimore|   MD|null|
+-----+--------------------+-------------------+-----------+-----+----+
only showing top 5 rows



# Right outer

In [79]:
business_df.join(supervisor_df, 'zip','right_outer').show(5)

+-----+--------------------+--------------------+-------------+-----+---+
|  zip|                name|              street|         city|state| id|
+-----+--------------------+--------------------+-------------+-----+---+
|94109|Stephens Institut...|1835-49 Van Ness Ave|San Francisco|   CA|  2|
|94109|Stephens Institut...|        1055 Pine St|San Francisco|   CA|  2|
|94109|     Alioto F Co Inc|    440 Jefferson St|San Francisco|   CA|  2|
|94109|     Haines Robert D|   786-792 Sutter St|San Francisco|   CA|  2|
|94109|Avis Rent A Car S...|         675 Post St|San Francisco|   CA|  2|
+-----+--------------------+--------------------+-------------+-----+---+
only showing top 5 rows



# Left-semi

In [102]:
business_df.join(supervisor_df, 'zip','leftsemi').show(5)

ERROR! Session/line number was not unique in database. History logging moved to new session 605
+-----+--------------------+--------------------+-------------+-----+
|  zip|                name|              street|         city|state|
+-----+--------------------+--------------------+-------------+-----+
|94109|Stephens Institut...|1835-49 Van Ness Ave|San Francisco|   CA|
|94109|Stephens Institut...|        1055 Pine St|San Francisco|   CA|
|94109|     Alioto F Co Inc|    440 Jefferson St|San Francisco|   CA|
|94109|     Haines Robert D|   786-792 Sutter St|San Francisco|   CA|
|94109|Avis Rent A Car S...|         675 Post St|San Francisco|   CA|
+-----+--------------------+--------------------+-------------+-----+
only showing top 5 rows



In [103]:
business_df.join(supervisor_df, 'zip','outer').show(5)

+-----+--------------------+-------------------+-----------+-----+----+
|  zip|                name|             street|       city|state|  id|
+-----+--------------------+-------------------+-----------+-----+----+
| 2142|        Trecler Alan|        1 Rogers St|  Cambridge|   MA|null|
| 2142|Aegerion Pharmace...|        101 Main St|  Cambridge|   MA|null|
| 8086|         Aramsco Inc| 1480 Grandview Ave|  Thorofare|   NJ|null|
|18051|Five Thousand For...|       8020 Mine St|Fogelsville|   PA|null|
|21220|Social Solutions ...|425 Williams Ct 100|  Baltimore|   MD|null|
+-----+--------------------+-------------------+-----------+-----+----+
only showing top 5 rows



# Example 7 register_DF 

# Save Supservisor DataFrame as “Supervisor” and Business DataFrame as “Business”. And find supervisor id for "Holbert Deneice M"

In [105]:
business_df.write.saveAsTable('Business')

ERROR! Session/line number was not unique in database. History logging moved to new session 606


In [106]:
supervisor_df.write.saveAsTable('supervisor')

In [107]:
ss.sql('select id from Business JOIN supervisor on Business.zip == supervisor.zip\
    where name="Holbert Deneice M"').show()

ERROR! Session/line number was not unique in database. History logging moved to new session 607


# Example 8 Kil the spark context and re-read

In [97]:
# It will throw an error 

# Save Supservisor DataFrame as “Super” and Business DataFrame as “Busin”.

In [101]:
# business_df.write.option('path','/Users/efar/Downloads/spark/week-2').saveAsTable('Businesses')

ERROR! Session/line number was not unique in database. History logging moved to new session 601


In [108]:
# supervisor_df.write.option('path','/Users/efar/Downloads/spark/week-2').saveAsTable('Super')

In [109]:
# ss.sql('select id from Business JOIN supervisor on Businesses.zip == Super.zip\
#     where name="Holbert Deneice M"').show()

# Example 9 

# Read world_bank_project.json as a DataFrame.

In [110]:
world_bank_prj = ss.read.json("../Data/world_bank_project.json")

ERROR! Session/line number was not unique in database. History logging moved to new session 610


In [112]:
world_bank_prj.show(5)

+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+------------------+--------------------+------------------------+--------+-----------+-------+----------+--------------------+--------------------+----------------+---------------+--------------------+--------------------+--------------------+--------------------+-----------+--------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+------+------+--------------------+--------------------+--------------------+-----------+---------+------------+--------------------+
|                 _id|approvalfy|board_approval_month|   boardapprovaldate|            borrower|         clos

In [113]:
world_bank_prj.write.json("world_bank_project_2")

ERROR! Session/line number was not unique in database. History logging moved to new session 611


# Example 10 
# Read filtered_registered_business_sf.csv with infering schema.

In [114]:
business_df = ss.read.csv(
                          "../Data/filtered_registered_business_sf.csv",
                          inferSchema=True)


In [115]:
business_df.show(5)

+-----+--------------------+--------------------+-------------+---+
|  _c0|                 _c1|                 _c2|          _c3|_c4|
+-----+--------------------+--------------------+-------------+---+
|94123|   Tournahu George L|   3301 Broderick St|San Francisco| CA|
|94124|Stephens Institut...|    2225 Jerrold Ave|San Francisco| CA|
|94105|Stephens Institut...|180 New Montgomer...|San Francisco| CA|
|94108|Stephens Institut...|       540 Powell St|San Francisco| CA|
|94107|Stephens Institut...|     460 Townsend St|San Francisco| CA|
+-----+--------------------+--------------------+-------------+---+
only showing top 5 rows



In [116]:
from pyspark.sql.types import * 

In [117]:
business_schedma = StructType([StructField("zip", IntegerType(), True), 
                         StructField("name", StringType(), True),
                         StructField("street", StringType(), True), 
                         StructField("city", StringType(), True),
                         StructField("state", StringType(), True)])


In [118]:
business_df = ss.read.csv(
                          "../Data/filtered_registered_business_sf.csv",
                          schema=business_schedma)


In [119]:
business_df.show(5)

+-----+--------------------+--------------------+-------------+-----+
|  zip|                name|              street|         city|state|
+-----+--------------------+--------------------+-------------+-----+
|94123|   Tournahu George L|   3301 Broderick St|San Francisco|   CA|
|94124|Stephens Institut...|    2225 Jerrold Ave|San Francisco|   CA|
|94105|Stephens Institut...|180 New Montgomer...|San Francisco|   CA|
|94108|Stephens Institut...|       540 Powell St|San Francisco|   CA|
|94107|Stephens Institut...|     460 Townsend St|San Francisco|   CA|
+-----+--------------------+--------------------+-------------+-----+
only showing top 5 rows



In [120]:
business_df.write.csv("business_sf")

In [121]:
business_df.coalesce(1).write.csv("business_sf_2")


# Read Business and Supervisor parquet files generated from ex12

In [124]:
# business_df.write.saveAsTable('Busines')
# business_df = ss.read.parquet("Busines")

ERROR! Session/line number was not unique in database. History logging moved to new session 613
