# Pyspark syntax

In [26]:
sc

<pyspark.context.SparkContext at 0x101734cf8>

In [27]:
sqlContext #spark data table context

<pyspark.sql.context.SQLContext at 0x107723278>

In [28]:
a = range(10)

In [29]:
list(a)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [30]:
rdd = sc.parallelize(a) #lazy send to spark
rdd

PythonRDD[89] at RDD at PythonRDD.scala:48

In [31]:
rdd.first()

0

In [32]:
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [33]:
rdd =rdd.map(lambda x: x*10) #lazy! it return another rdd
rdd

PythonRDD[91] at RDD at PythonRDD.scala:48

In [34]:
rdd.reduce(lambda x,y:x+y) # not lazy (action)

450

In [35]:
rdd = rdd.filter(lambda x: x>30)
rdd

PythonRDD[93] at RDD at PythonRDD.scala:48

In [36]:
rdd.collect()

[40, 50, 60, 70, 80, 90]

## Exercises
1. Get an RDD with numbers 2 to 10
2. Get all elements that are bigger than 5
3. Get the product of the elements of the result of 2

In [37]:
rdd =  sc.parallelize(range(2,11))
print(rdd.collect())
rdd = rdd.filter(lambda x: x >5)
print(rdd.collect())
rdd = rdd.reduce(lambda x,y: x*y)
print(rdd)

[2, 3, 4, 5, 6, 7, 8, 9, 10]
[6, 7, 8, 9, 10]
30240


In [None]:
[1,2,3,4] -> [1,4,9,16] -> 1*4=4 9*16=144  144*4 =   

## Input

In [42]:
allcsv = sc.textFile("./*.csv") #depends where you are running! /data/*.csv if you are in bigdata1

In [43]:
allcsv.first()

'Crime ID,Month,Reported by,Falls within,Longitude,Latitude,Location,LSOA code,LSOA name,Crime type,Last outcome category,Context'

In [None]:
allcsv.map(lambda x: x.split(",")).first()

## Tuples and ReduceByKey
First element of a tuple is considered as key

In [None]:
data = [['Alexandra','31','F','Python'],['Carla','25','F','C'],['Max','18','M','Scala'],['Tom','34','M','C'],['Philip','28','M','Python'],['Lucy','25','F','Scala'],['Al','18','M','Scala'],['Grace','34','F','Python']]

In [None]:
data

In [None]:
RDD = sc.parallelize(data) #send data to spark

In [None]:
help(RDD.reduceByKey) #exit with q

In [None]:
sumByGender = RDD.map(lambda t: (t[2],1)).reduceByKey(lambda x,y: x+y)
sumByGender.collect()

In [None]:
languageAndAge = RDD.map(lambda t: (t[3],int(t[1])))
languageAndAge.collect()

In [None]:
languageAndAge.reduceByKey(lambda x,y:x+y).collect()

## Getting the average

In [None]:
temp = RDD.map(lambda t: (t[3],(int(t[1]),1)))
temp.collect()

In [None]:
temp2 = temp.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
temp2.collect()

In [None]:
temp2.map(lambda x:(x[0],x[1][0]/x[1][1])).collect()

## Exercise
1. Compute the average age by gender (the key is the first element in the tuple)
2. Compute the preferred language by gender (use a tuple as a key)

In [None]:
temp = RDD.map(lambda t: (t[2],(int(t[1]),1)))
temp.collect()

In [None]:
temp2 = temp.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
temp2.collect()

In [None]:
temp2.map(lambda x: (x[0],x[1][0]/x[1][1])).collect()

Compute the preferred language by gender (use a tuple as a key)

In [None]:
temp = RDD.map(lambda t: ((t[2],t[3]),1))
temp.collect()

In [None]:
temp2 = temp.reduceByKey(lambda x,y: x+y)
temp2.collect()

In [None]:
temp3 = temp2.map(lambda x: (x[0][0],(x[0][1],x[1])))
temp3.collect()

In [None]:
temp3.reduceByKey(max).collect()

Number of clusters:

In [None]:
sc._jsc.sc().getExecutorMemoryStatus().size()

# Pyspark SQL and Dataframes

In [40]:
df = sqlContext.read.format('com.databricks.spark.csv').options(delimiter=',',header='true', inferschema='true',mode="FAILFAST").load('./crime.csv')

In [41]:
df.show()

+--------------------+-------+--------------------+--------------------+---------+---------+--------------------+---------+--------------------+--------------------+---------------------+-------+
|            Crime ID|  Month|         Reported by|        Falls within|Longitude| Latitude|            Location|LSOA code|           LSOA name|          Crime type|Last outcome category|Context|
+--------------------+-------+--------------------+--------------------+---------+---------+--------------------+---------+--------------------+--------------------+---------------------+-------+
|6ce50abd0bf1ca408...|2016-12|Avon and Somerset...|Avon and Somerset...|-2.511571|51.414895|On or near Orchar...|E01014399|Bath and North Ea...|Criminal damage a...|  Under investigation|   null|
|6e15f8dd5c88a65c2...|2016-12|Avon and Somerset...|Avon and Somerset...|-2.516919|51.423683|    On or near A4175|E01014399|Bath and North Ea...|Violence and sexu...|  Under investigation|   null|
|2594621f67f0a2192..

In [3]:
df.select('Month','Crime type').show()

+-------+--------------------+
|  Month|          Crime type|
+-------+--------------------+
|2016-12|Criminal damage a...|
|2016-12|Violence and sexu...|
|2016-12|Violence and sexu...|
|2016-12|Violence and sexu...|
|2016-12|         Other crime|
|2016-12|Anti-social behav...|
|2016-12|Anti-social behav...|
|2016-12|Anti-social behav...|
|2016-12|Anti-social behav...|
|2016-12|       Bicycle theft|
|2016-12|Criminal damage a...|
|2016-12|         Other theft|
|2016-12|         Other theft|
|2016-12|         Other theft|
|2016-12|         Shoplifting|
|2016-12|Violence and sexu...|
|2016-12|Violence and sexu...|
|2016-12|Violence and sexu...|
|2016-12|Anti-social behav...|
|2016-12|Anti-social behav...|
+-------+--------------------+
only showing top 20 rows



In [4]:
df.printSchema()

root
 |-- Crime ID: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Reported by: string (nullable = true)
 |-- Falls within: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- LSOA code: string (nullable = true)
 |-- LSOA name: string (nullable = true)
 |-- Crime type: string (nullable = true)
 |-- Last outcome category: string (nullable = true)
 |-- Context: string (nullable = true)



In [5]:
# convert A as string
from pyspark.sql.types import *
df.withColumn('Latitude',df['Latitude'].cast(StringType()))

DataFrame[Crime ID: string, Month: string, Reported by: string, Falls within: string, Longitude: double, Latitude: string, Location: string, LSOA code: string, LSOA name: string, Crime type: string, Last outcome category: string, Context: string]

In [6]:
df = df.withColumn('Month',df['Month'].cast(DateType()))
df.printSchema()

root
 |-- Crime ID: string (nullable = true)
 |-- Month: date (nullable = true)
 |-- Reported by: string (nullable = true)
 |-- Falls within: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- LSOA code: string (nullable = true)
 |-- LSOA name: string (nullable = true)
 |-- Crime type: string (nullable = true)
 |-- Last outcome category: string (nullable = true)
 |-- Context: string (nullable = true)



In [7]:
from pyspark.sql.functions import mean, min, max
df.select(min('Month'),max('Month')).show()

+----------+----------+
|min(Month)|max(Month)|
+----------+----------+
|2016-12-01|2016-12-01|
+----------+----------+



see https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html

In [8]:
df.stat.crosstab("LSOA name", "Crime Type").show() 

+--------------------+---------------------+-------------+--------+-------------------------+-----+-----------+-----------+---------------------+------------+-------+-----------+---------------------+-------------+----------------------------+
|LSOA name_Crime Type|Anti-social behaviour|Bicycle theft|Burglary|Criminal damage and arson|Drugs|Other crime|Other theft|Possession of weapons|Public order|Robbery|Shoplifting|Theft from the person|Vehicle crime|Violence and sexual offences|
+--------------------+---------------------+-------------+--------+-------------------------+-----+-----------+-----------+---------------------+------------+-------+-----------+---------------------+-------------+----------------------------+
|South Gloucesters...|                    1|            0|       0|                        0|    0|          0|          2|                    0|           0|      0|          0|                    0|            0|                           5|
|         Mendip 002A|  

We can go to http://bigdata1.sheffield.ac.uk:50070/explorer.html#/data/ukpolice to see the hfs filesystem

In [9]:
# how to work with dates
#dates = ("2013-01-01",  "2015-07-01")
#date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]
#sf.where((sf.my_col > date_from) & (sf.my_col < date_to))

In [10]:
df.filter(df["Crime type"] == 'Burglary').select(df['Crime type'],df.Latitude,df.Longitude).show()

+----------+---------+---------+
|Crime type| Latitude|Longitude|
+----------+---------+---------+
|  Burglary| 51.41364|-2.498127|
|  Burglary|51.395315|-2.391594|
|  Burglary|51.392676|-2.350423|
|  Burglary|51.388973|-2.352608|
|  Burglary|51.391003|-2.356346|
|  Burglary|51.391003|-2.356346|
|  Burglary|51.386211|-2.359211|
|  Burglary|51.380421|-2.358907|
|  Burglary|51.386211|-2.359211|
|  Burglary|51.389609|-2.390367|
|  Burglary|51.383968| -2.36339|
|  Burglary|51.381522|-2.363543|
|  Burglary| 51.38013|-2.365745|
|  Burglary|51.383162|-2.368039|
|  Burglary|51.383414|-2.370958|
|  Burglary|51.390704|-2.319911|
|  Burglary|51.328447|-2.371088|
|  Burglary|51.323676|-2.369973|
|  Burglary| 51.37917|-2.392864|
|  Burglary|51.374852|-2.382799|
+----------+---------+---------+
only showing top 20 rows



In [11]:
df.filter(df["Crime type"] == 'Burglary').select(df['Crime type'],df.Latitude,df.Longitude).explain()

== Physical Plan ==
*Project [Crime type#9, Latitude#5, Longitude#4]
+- *Filter (isnotnull(Crime type#9) && (Crime type#9 = Burglary))
   +- *FileScan csv [Longitude#4,Latitude#5,Crime type#9] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/alessandro/Desktop/pyspark-tutorial-solved-master/crime.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Crime type), EqualTo(Crime type,Burglary)], ReadSchema: struct<Longitude:double,Latitude:double,Crime type:string>


In [12]:
#or equivalently
sqlContext.registerDataFrameAsTable(df, "table1")
sqlContext.sql('select `Crime type` Latitude, Longitude from table1 where `Crime type` == "Burglary"').show()

+--------+---------+
|Latitude|Longitude|
+--------+---------+
|Burglary|-2.498127|
|Burglary|-2.391594|
|Burglary|-2.350423|
|Burglary|-2.352608|
|Burglary|-2.356346|
|Burglary|-2.356346|
|Burglary|-2.359211|
|Burglary|-2.358907|
|Burglary|-2.359211|
|Burglary|-2.390367|
|Burglary| -2.36339|
|Burglary|-2.363543|
|Burglary|-2.365745|
|Burglary|-2.368039|
|Burglary|-2.370958|
|Burglary|-2.319911|
|Burglary|-2.371088|
|Burglary|-2.369973|
|Burglary|-2.392864|
|Burglary|-2.382799|
+--------+---------+
only showing top 20 rows



In [13]:
sqlContext.sql('select `Crime type` Latitude, Longitude from table1 where `Crime type` == "Burglary"').explain()

== Physical Plan ==
*Project [Crime type#9 AS Latitude#265, Longitude#4]
+- *Filter (isnotnull(Crime type#9) && (Crime type#9 = Burglary))
   +- *FileScan csv [Longitude#4,Crime type#9] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/alessandro/Desktop/pyspark-tutorial-solved-master/crime.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Crime type), EqualTo(Crime type,Burglary)], ReadSchema: struct<Longitude:double,Crime type:string>


## Caching

In [14]:
df.cache() # put the df in cache and results will be cached too (try to run a count twice after this)
df.count()

13750

In [15]:
df.count()

13750

In [16]:
# adding columns and keeping existing ones F.lit(0) return a column
from pyspark.sql import functions as F
df.withColumn('zero', F.lit(0))
df.select('Longitude','Latitude').withColumn('Longitude_times_two', df.Longitude * 2).show()

+---------+---------+-------------------+
|Longitude| Latitude|Longitude_times_two|
+---------+---------+-------------------+
|-2.511571|51.414895|          -5.023142|
|-2.516919|51.423683|          -5.033838|
|-2.511571|51.414895|          -5.023142|
|-2.495055|51.422132|           -4.99011|
|-2.509126|51.416137|          -5.018252|
|-2.498613|51.416002|          -4.997226|
|-2.497767|51.420232|          -4.995534|
| -2.49991|51.413623|           -4.99982|
| -2.49793|51.417966|           -4.99586|
|-2.494715|51.419948|           -4.98943|
|-2.498613|51.416002|          -4.997226|
|-2.501425|51.416692|           -5.00285|
|-2.497767|51.420232|          -4.995534|
|-2.497799|51.415233|          -4.995598|
| -2.49854|51.414618|           -4.99708|
|-2.504289| 51.41828|          -5.008578|
|-2.501425|51.416692|           -5.00285|
|-2.499922|51.417373|          -4.999844|
|-2.506762|51.409116|          -5.013524|
|-2.506762|51.409116|          -5.013524|
+---------+---------+-------------

In [17]:
from pyspark.sql.functions import col, first, last, sum, count, countDistinct, desc #*
# selecting columns, and creating new ones
df.select('Latitude', col('Latitude').alias('new_Lat'), (col('Longitude') < 0 ).alias('negative_long')).show()

+---------+---------+-------------+
| Latitude|  new_Lat|negative_long|
+---------+---------+-------------+
|51.414895|51.414895|         true|
|51.423683|51.423683|         true|
|51.414895|51.414895|         true|
|51.422132|51.422132|         true|
|51.416137|51.416137|         true|
|51.416002|51.416002|         true|
|51.420232|51.420232|         true|
|51.413623|51.413623|         true|
|51.417966|51.417966|         true|
|51.419948|51.419948|         true|
|51.416002|51.416002|         true|
|51.416692|51.416692|         true|
|51.420232|51.420232|         true|
|51.415233|51.415233|         true|
|51.414618|51.414618|         true|
| 51.41828| 51.41828|         true|
|51.416692|51.416692|         true|
|51.417373|51.417373|         true|
|51.409116|51.409116|         true|
|51.409116|51.409116|         true|
+---------+---------+-------------+
only showing top 20 rows



In [18]:
df.printSchema()

root
 |-- Crime ID: string (nullable = true)
 |-- Month: date (nullable = true)
 |-- Reported by: string (nullable = true)
 |-- Falls within: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- LSOA code: string (nullable = true)
 |-- LSOA name: string (nullable = true)
 |-- Crime type: string (nullable = true)
 |-- Last outcome category: string (nullable = true)
 |-- Context: string (nullable = true)



In [19]:
df.groupBy('Crime type').agg(first('Last outcome category').alias("status")).show()

+--------------------+--------------------+
|          Crime type|              status|
+--------------------+--------------------+
|       Bicycle theft| Under investigation|
|        Public order| Under investigation|
|               Drugs|Awaiting court ou...|
|         Other crime| Under investigation|
|             Robbery| Under investigation|
|Criminal damage a...| Under investigation|
|Theft from the pe...| Under investigation|
|         Shoplifting| Under investigation|
|            Burglary| Under investigation|
|         Other theft| Under investigation|
|Possession of wea...|Awaiting court ou...|
|Violence and sexu...| Under investigation|
|       Vehicle crime| Under investigation|
|Anti-social behav...|                null|
+--------------------+--------------------+



## Exercise
1. Show how many crimes we have for each crime type (hint: use groupby, agg and count)
2. Show how many *distinct*  'Last outcome category' we have for each Crime type
3. Show how many crimes we have for each LSOA code and crime type (hint: groupy by two keys)

In [20]:
df.groupBy('Crime type').agg(count('Crime type')).show() # df.groupBy('Crime type').agg(count).show()

+--------------------+-----------------+
|          Crime type|count(Crime type)|
+--------------------+-----------------+
|       Bicycle theft|              205|
|        Public order|             1304|
|               Drugs|              325|
|         Other crime|              146|
|             Robbery|              105|
|Criminal damage a...|             1366|
|Theft from the pe...|              127|
|         Shoplifting|              934|
|            Burglary|              914|
|         Other theft|             1136|
|Possession of wea...|               66|
|Violence and sexu...|             3754|
|       Vehicle crime|              842|
|Anti-social behav...|             2526|
+--------------------+-----------------+



In [21]:
df.groupBy('Crime type').agg(countDistinct('Last outcome category')).show()

+--------------------+-------------------------------------+
|          Crime type|count(DISTINCT Last outcome category)|
+--------------------+-------------------------------------+
|       Bicycle theft|                                    2|
|        Public order|                                   11|
|               Drugs|                                   10|
|         Other crime|                                    6|
|             Robbery|                                    4|
|Criminal damage a...|                                    9|
|Theft from the pe...|                                    4|
|         Shoplifting|                                   13|
|            Burglary|                                    4|
|         Other theft|                                    8|
|Possession of wea...|                                    7|
|Violence and sexu...|                                   12|
|       Vehicle crime|                                    5|
|Anti-social behav...|  

In [22]:
df

DataFrame[Crime ID: string, Month: date, Reported by: string, Falls within: string, Longitude: double, Latitude: double, Location: string, LSOA code: string, LSOA name: string, Crime type: string, Last outcome category: string, Context: string]

In [23]:
df.groupBy('Crime type','LSOA name').count().show()

+--------------------+--------------------+-----+
|          Crime type|           LSOA name|count|
+--------------------+--------------------+-----+
|Violence and sexu...|Bath and North Ea...|    3|
|Anti-social behav...|Bath and North Ea...|    3|
|             Robbery|Bath and North Ea...|    1|
|         Shoplifting|Bath and North Ea...|    1|
|            Burglary|Bath and North Ea...|    1|
|Violence and sexu...|        Bristol 001H|    2|
|Violence and sexu...|        Bristol 010D|    4|
|Violence and sexu...|        Bristol 012D|    2|
|Theft from the pe...|        Bristol 013A|    3|
|         Other theft|        Bristol 016A|    1|
|         Other theft|        Bristol 019B|    1|
|         Other theft|        Bristol 022C|    9|
|            Burglary|        Bristol 028A|    1|
|Anti-social behav...|        Bristol 030B|    2|
|Criminal damage a...|        Bristol 035E|    1|
|        Public order|        Bristol 039A|    3|
|Criminal damage a...|        Bristol 047D|    2|


In [24]:
df.sort(desc("Latitude")).select('Latitude').show()

+---------+
| Latitude|
+---------+
| 53.70861|
|52.489466|
|52.132956|
|52.132956|
|51.945433|
|51.886984|
|51.886984|
|51.886984|
|51.886984|
|51.698864|
|51.634454|
|51.634454|
|51.634454|
|51.634454|
|51.634454|
|51.633598|
|51.633598|
|51.630366|
|51.628035|
|51.628035|
+---------+
only showing top 20 rows



In [25]:
df.select('Latitude','Longitude').where(df['Latitude']>51.6).show()

+---------+---------+
| Latitude|Longitude|
+---------+---------+
|52.132956|-0.446839|
|52.132956|-0.446839|
|51.886984|-2.075333|
|51.886984|-2.075333|
|51.886984|-2.075333|
|51.886984|-2.075333|
| 53.70861|-1.678624|
|51.698864|-2.901494|
|51.945433|-0.259986|
|51.612107|-2.511244|
|51.612107|-2.511244|
|51.616651|-2.518618|
|51.613491|-2.511462|
|51.617423|-2.510755|
|51.617423|-2.510755|
|51.617423|-2.510755|
|51.617423|-2.510755|
| 51.61184|-2.516816|
|51.615619|-2.518303|
|51.614792|-2.516214|
+---------+---------+
only showing top 20 rows



## Exercise

1. show the LSOA names where the number of crimes is bigger than 100 (use groupby count and where)
2. sort them by count of crimes
3. see help(df.stat.freqItems) and show the crimes and lsoa name appearing  more than 30% (hint support is 0.3, use show(truncate=False) to see the result)

In [None]:
df.groupBy('LSOA name').count().dropna().where(col('count')>100).show()

In [None]:
df.groupBy('LSOA name').count().dropna().where(col('count')>100).sort(desc('count')).show()

In [None]:
help(df.stat.freqItems) #q to exit

In [None]:
df.stat.freqItems(['LSOA name','Crime type'],0.3).show(truncate=False)