In [1]:
# Reading data with spark #

#create spark session
from pyspark.sql import SparkSession

# SparkSession object
spark = SparkSession.builder.appName("Analyze London").getOrCreate()

# create Spark data frame from csv with spark.read()
data = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load(r"D:\dev\large_dataset\spark-2-getting-started\02\demos\datasets\london_crime_by_lsoa.csv")

print(type(data))

<class 'pyspark.sql.dataframe.DataFrame'>


In [2]:
# check data schema for data frame 
data.printSchema()
print(data.dtypes)
print(data.describe)

root
 |-- lsoa_code: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- major_category: string (nullable = true)
 |-- minor_category: string (nullable = true)
 |-- value: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)

[('lsoa_code', 'string'), ('borough', 'string'), ('major_category', 'string'), ('minor_category', 'string'), ('value', 'string'), ('year', 'string'), ('month', 'string')]
<bound method DataFrame.describe of DataFrame[lsoa_code: string, borough: string, major_category: string, minor_category: string, value: string, year: string, month: string]>


In [3]:
# count the number of records -

# data.count()
data.limit(5).show()



+---------+----------+--------------------+--------------------+-----+----+-----+
|lsoa_code|   borough|      major_category|      minor_category|value|year|month|
+---------+----------+--------------------+--------------------+-----+----+-----+
|E01001116|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|   Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
+---------+----------+--------------------+--------------------+-----+----+-----+



In [4]:
# drop rows having na
data.dropna()
data.show(5)

+---------+----------+--------------------+--------------------+-----+----+-----+
|lsoa_code|   borough|      major_category|      minor_category|value|year|month|
+---------+----------+--------------------+--------------------+-----+----+-----+
|E01001116|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|   Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
+---------+----------+--------------------+--------------------+-----+----+-----+
only showing top 5 rows



In [5]:
# drop the columns which are not useful with data.drop(column)
data.drop('lsoa_code')

DataFrame[borough: string, major_category: string, minor_category: string, value: string, year: string, month: string]

In [6]:
# check unique values with distinct function
# select a column and call distinct on it

distinct_br = data.select('borough').distinct()

In [7]:
distinct_br.show(5)

+--------------------+
|             borough|
+--------------------+
|             Croydon|
|          Wandsworth|
|              Bexley|
|             Lambeth|
|Barking and Dagenham|
+--------------------+
only showing top 5 rows



In [8]:
# filter data - where clause

hackney_data = data.filter(data['borough'] == 'Hackney')
hackney_data.show(5)

+---------+-------+--------------------+--------------------+-----+----+-----+
|lsoa_code|borough|      major_category|      minor_category|value|year|month|
+---------+-------+--------------------+--------------------+-----+----+-----+
|E01001786|Hackney|     Criminal Damage|Criminal Damage T...|    0|2011|    6|
|E01001794|Hackney|Violence Against ...|          Harassment|    1|2013|    2|
|E01001787|Hackney|     Criminal Damage|Other Criminal Da...|    0|2011|    7|
|E01001738|Hackney|Violence Against ...|        Wounding/GBH|    0|2013|   12|
|E01001807|Hackney|  Theft and Handling|  Other Theft Person|    0|2016|    8|
+---------+-------+--------------------+--------------------+-----+----+-----+
only showing top 5 rows



In [9]:
# filter in clause

data_15_16 = data.filter(data['year'].isin(['2015', '2016']))
data_15_16.show(5)

+---------+---------+--------------------+--------------------+-----+----+-----+
|lsoa_code|  borough|      major_category|      minor_category|value|year|month|
+---------+---------+--------------------+--------------------+-----+----+-----+
|E01001116|  Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|  Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774|Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004177|   Sutton|  Theft and Handling|Theft/Taking of P...|    1|2016|    8|
+---------+---------+--------------------+--------------------+-----+----+-----+
only showing top 5 rows



In [10]:
# filter - check just a fraction of data with sample function

f01 = data_15_16.sample(fraction=0.000001).show()

+---------+--------------------+------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|    major_category|      minor_category|value|year|month|
+---------+--------------------+------------------+--------------------+-----+----+-----+
|E01003333|            Lewisham|   Criminal Damage|Criminal Damage T...|    0|2016|    1|
|E01002915|Kingston upon Thames|Theft and Handling|  Other Theft Person|    0|2016|    2|
+---------+--------------------+------------------+--------------------+-----+----+-----+



In [11]:
# grouping aggregating with ##
# group by clause - just like count(*) followed by group by, here it is groupBy('fieldname') and then count() to get the count

bcr = data.groupBy('borough').count()
bcr.show(5)

+--------------------+------+
|             borough| count|
+--------------------+------+
|             Croydon|602100|
|          Wandsworth|498636|
|              Bexley|385668|
|             Lambeth|519048|
|Barking and Dagenham|311040|
+--------------------+------+
only showing top 5 rows



In [12]:
#group by for more than on field at at iem

bcr = data.groupBy(['borough', 'value']).count()
bcr.show()

+--------------------+-----+-----+
|             borough|value|count|
+--------------------+-----+-----+
|              Camden|   23|   66|
|            Haringey|   18|   31|
|          Hillingdon|  131|    1|
|            Hounslow|   30|    2|
|Kingston upon Thames|   81|    2|
|             Hackney|   65|    1|
|Barking and Dagenham|    7|  371|
|              Barnet|    4| 3983|
|      City of London|    2|   84|
|              Camden|   26|   49|
|           Southwark|   23|   31|
|Kingston upon Thames|   33|   12|
|            Havering|   30|    5|
|      Waltham Forest|   25|    8|
|Kingston upon Thames|   31|   10|
|          Hillingdon|  102|    1|
|             Croydon|   45|    2|
|         Westminster|  217|    1|
|              Sutton|    1|43870|
|               Brent|    5| 2422|
+--------------------+-----+-----+
only showing top 20 rows



In [13]:
# use group by with other aggregations like sum with .agg function.
# so like in SQL call sum on a value and then aggregate it in backwareds order
# here sum is a built in function

bcr = data.groupBy('borough').agg( {"value":"sum"} )
bcr.show(5)

+--------------------+----------+
|             borough|sum(value)|
+--------------------+----------+
|             Croydon|  260294.0|
|          Wandsworth|  204741.0|
|              Bexley|  114136.0|
|             Lambeth|  292178.0|
|Barking and Dagenham|  149447.0|
+--------------------+----------+
only showing top 5 rows



In [14]:
# select a column and select it with as in sql - withColumnName
#select a as b - withColumnRenamed('borough', 'X')

data.select('borough').withColumnRenamed('borough','v').show(4)

# same for the above

bcr = data.groupBy('borough').agg({"value":"sum"}).withColumnRenamed('borough', 'X')


+---------+
|        v|
+---------+
|  Croydon|
|Greenwich|
|  Bromley|
|Redbridge|
+---------+
only showing top 4 rows



In [15]:
# select sum(a,v,b) without a group by
data.agg({'value': 'sum'}).show(4)

+----------+
|sum(value)|
+----------+
| 6447758.0|
+----------+



In [16]:
# total_conv = data.collect()[0][0]
################################################
#  collect function 

import pyspark.sql.functions as func


In [17]:
# func has round method - round(number, 2) - will round of to two decimal places
# orderBy
data.select(['borough', 'value']).orderBy('borough', ).show(4) # order by a column

data.select(['borough', 'value']).orderBy('borough', ascending=True).show(4) # order by a column ASC

data.select(['borough', 'value']).orderBy('borough', desc=True).show(4) # order by a column DESC


+--------------------+-----+
|             borough|value|
+--------------------+-----+
|Barking and Dagenham|    1|
|Barking and Dagenham|    0|
|Barking and Dagenham|    0|
|Barking and Dagenham|    0|
+--------------------+-----+
only showing top 4 rows

+--------------------+-----+
|             borough|value|
+--------------------+-----+
|Barking and Dagenham|    1|
|Barking and Dagenham|    0|
|Barking and Dagenham|    0|
|Barking and Dagenham|    0|
+--------------------+-----+
only showing top 4 rows

+--------------------+-----+
|             borough|value|
+--------------------+-----+
|Barking and Dagenham|    0|
|Barking and Dagenham|    0|
|Barking and Dagenham|    0|
|Barking and Dagenham|    1|
+--------------------+-----+
only showing top 4 rows



In [18]:
#select count(*), convictions, months group by con, months where year = 1020

data.filter(data['year']==2014).groupBy('month').agg({"value" : "sum"}).show(10)

#GROUP BY ALWAYS NEED AGGREGATION




+-----+----------+
|month|sum(value)|
+-----+----------+
|    7|   58564.0|
|   11|   59704.0|
|    3|   57669.0|
|    8|   55641.0|
|    5|   56327.0|
|    6|   57039.0|
|    9|   56933.0|
|    1|   55515.0|
|   10|   60537.0|
|    4|   53467.0|
+-----+----------+
only showing top 10 rows



In [19]:
###### more aggregation  and plotting #######

data.select('year').agg({"year":"min"}).show()
data.select('year').agg({"year":"max"}).show()

+---------+
|min(year)|
+---------+
|     2008|
+---------+

+---------+
|max(year)|
+---------+
|     2016|
+---------+



In [20]:
data.select('year').agg({"year":"max"}).show()

+---------+
|max(year)|
+---------+
|     2016|
+---------+



In [21]:
# sp_help - dtypes
data.dtypes

[('lsoa_code', 'string'),
 ('borough', 'string'),
 ('major_category', 'string'),
 ('minor_category', 'string'),
 ('value', 'string'),
 ('year', 'string'),
 ('month', 'string')]

In [22]:
#data.describe - will show min, max, count, etc on the frame- like in pandas

# to get information in matrix form: - crosstab function
# crosstab - pair wise frequency table of given columns

data.crosstab('borough', 'major_category').select('Drugs', 'borough_major_category').show(10)
#data.crosstab('borough', 'major_category').select('Drugs', 'Robbery', 'Burglary').show(10)



+-----+----------------------+
|Drugs|borough_major_category|
+-----+----------------------+
|32616|              Havering|
|29160|                Merton|
|35424|              Haringey|
|37368|         Tower Hamlets|
|42336|               Bromley|
|44064|               Enfield|
|22140|  Kingston upon Thames|
|32616|           Westminster|
|23004|  Richmond upon Thames|
|43740|              Lewisham|
+-----+----------------------+
only showing top 10 rows



In [23]:
################ accumulators & broadcast variables ######################

# Scala utilies closures - function pointers
# function can return another function as return value
# input argument to a function can also be a function

# what is closure:
# if there is a nested function, then inner function has access to the variable in the outter function
# this access persist even if the outter function scope as vanished
# Note: the inner function can is called directly from outside 

# these called inner functions carry the copies of the local variable of the outter functions with them around
# this is called closure

####>>>> closure -> boradcast -> accumulators => # in the notes #

# from pyspark.sql.functions import broadcast
# from pyspark.sql.functions import udf -> user defined functions


# usually peform join operations with broadcast for efficiencies



In [29]:
# using broadcast

import pyspark.sql
from pyspark.sql.functions import broadcast


ss = SparkSession.builder.appName("X").getOrCreate()
# RFOL - read format options load
d1 = ss.read\
    .format('csv') \
    .option('header', 'true') \
    .load(r"D:\dev\large_dataset\spark-2-getting-started\02\demos\datasets\player.csv")

d2 = ss.read.format('csv').option('header', 'true').load(r"D:\dev\large_dataset\spark-2-getting-started\02\demos\datasets\player_attributes.csv")

# d2id = d2.select('id').show(4) - broken fix it
# d1.select('id').join(d2id,'id', 'inner')

# create Spark data frame from csv with spark.read()
# borad

# d2.count()


In [31]:
#### saving data frames 

# coalesce - function to repartition data frame to write out to the file - argument 1 does that
# if you pass 2 to the coalesce function then each partion will write out a single file.

# due to coalesce function all the partitions are merged together and only file is written 
d1.select('id').coalesce(1)\
.write\
.option("header", "true")\
.csv(r"d:/dev/a.csv")

In [32]:
# this will write multiple files, as many data partions were there
d1.select('id') \
.write\
.option("header", "true")\
.csv(r"d:/dev/a.csv")

In [None]:
#################### SPARK SQL #########################################

In [33]:
#############################################################################
########################### HOW TO CREATE SPARK CONTEXT #####################
######## BUT YOU DO NOT NEED THIS. ###########################################
sc = spark.sparkContext

########//########//########//########//########// Register the DataFrame as a SQL temporary view
d1.createOrReplaceTempView("tbl1")

In [None]:
## runing a sql on the dataframe

# with spark session sql context is already available, use it directly
# YOU DO NOT NEED SQL CONTEXT TO QUERY - SS CAN DO IT DIRECTLY AS LONG AS THE DATAFRAM IS REGISTERED AS A TABLE.
print(type(ss)) 
ss.sql("select id, player_name, birthday from tbl1").show(3)

In [None]:
###########################################################################################
############### registering the table as global table - available for all the sessions ####
###########################################################################################

# d1.createOrReplaceTempView("d1")
d1.createOrReplaceGlobalTempView("gt")
print(type(gt))

In [None]:
### TO ACCESS global tables you need absolute name like 
#### global_temp.<table_name>

ss.sql("select * from global_temp.gt").show(3)

In [None]:
d1.schema

In [None]:
d1.select('id').write.save(r'd:/dev/new')

In [None]:
# ss.read.format('json').option().load(r"D:\dev\large_dataset\simple_json.json")
jd = spark.read.json(r"D:\dev\large_dataset\simple_json.json")
jd.show(4)
jd.schema
jd.category.schema