# Spark Command Note

## Intro

Spark has two different kinds of APIs

* **APIs**
    * RDD API: lower level, we should use this when we deal with unstructured data
    * DataFrame API: can be related to pandas dataframe in python.
        * SparkSQL    
* **2 modes**
    * shell
    * script
    
each block in the hadoop concept correspond to a partition of RDD.
One file correspond to a RDD.

* **Pros of Spark to MapReduce**: The main advantage of using Spark is that it can hold a portion of the original data in memory. It's easier to wrote any kinds of algorithms.

## Installation on Mac

* **Steps**:
    1. download version spark-2.3.0-bin-hadoop2.7.tgz from [Spark downloads page](http://spark.apache.org/downloads.html)
    2. manually unzip the file
    3. sudo mv spark-2.3.0-bin-hadoop2.7 /opt/spark-2.3.0-bin-hadoop2.7.tgz
        * used sudo because we need permission to move files into opt folder
    4. ln -s /opt/spark-2.3.0-bin-hadoop2.7 /opt/spark̀
        * create a shortcut to the actual folder
    5. vi ~/.zshrc
        * in other Linux the file should be ~/.bashrc
        * I downloaded zsh for my command line, that's why I use this file
    6. adding the following lines in zshrc file
        * export SPARK_HOME=/opt/spark
        * export PATH=$SPARK_HOME/bin:$PATH
    7. create a new terminal tab, type `pyspark`
    8. Used the second method to link pyspark to my jupyter notebook
        * There is another and more generalized way to use PySpark in a Jupyter Notebook: use findSpark package to make a Spark Context available in your code.
    9. run the sample code in below and it works.    
      
* **Reference**: [Get Started with PySpark and Jupyter Notebook in 3 Minutes](https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f)

In [1]:
import findspark
findspark.init()


In [1]:
import pyspark
import random
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

3.14219904


In [24]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
sqlContext = SQLContext(sc)

[Stackoverflow](https://stackoverflow.com/questions/47665491/pyspark-throws-typeerror-textfile-missing-1-required-positional-argument-na)

In [3]:
# do not print anything unless it's an error
sc.setLogLevel("ERROR")

Nothing is going to happen without an action

* **Transformations**: RDD to RDD
    * **map**: takes every elements of a partition in RDD and do something
    * **filter**:
    * **reduceByKey**: since it will reduce by key, it's a transformation
    * **flatMap**: one element with multiple output
    * **sortByKey**: Sort (key, value) RDD by key
    * **distinct**: get distinct values in RDD
* **Actions**: RDD to various things
    * **collect**: take all the data into memory
    * **take**: return the first n elements of RDD
    * **first**: return the first element of RDD
    * **reduce**: reduce function without a key

* To take a

There are two ways to create RDD.
* Read from file
* create from scratch


> **Create RDD from scratch**

In [4]:
# the second parameter indicates the number of parititions in the RDD
myRDD = sc.parallelize([1,2,3,4,5], 2)

In [5]:
myRDD.getNumPartitions()

2

> **Read file**

In [6]:
myRDD2 = sc.textFile("data/Crimes_-_2001_to_present.csv")

In [7]:
myRDD2.take(2)

['ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location',
 '10078659,HY267429,05/19/2015 11:57:00 PM,010XX E 79TH ST,143A,WEAPONS VIOLATION,UNLAWFUL POSS OF HANDGUN,STREET,true,false,0624,006,8,44,15,1184626,1852799,2015,05/26/2015 12:42:06 PM,41.751242944,-87.599004724,"(41.751242944, -87.599004724)"']

> **map (transformation)**

Each row will be mapped to one output 

* row1 -> list1
* row2 -> list2

In [8]:
myRDD.map(lambda x:x+1)
myRDD.collect()

[1, 2, 3, 4, 5]

In [9]:
def f(x):
    return x+1

In [10]:
myRDD.map(f).collect()

[2, 3, 4, 5, 6]

In [11]:
myRDD2.map(lambda x: x.split(",")).take(2)

[['ID',
  'Case Number',
  'Date',
  'Block',
  'IUCR',
  'Primary Type',
  'Description',
  'Location Description',
  'Arrest',
  'Domestic',
  'Beat',
  'District',
  'Ward',
  'Community Area',
  'FBI Code',
  'X Coordinate',
  'Y Coordinate',
  'Year',
  'Updated On',
  'Latitude',
  'Longitude',
  'Location'],
 ['10078659',
  'HY267429',
  '05/19/2015 11:57:00 PM',
  '010XX E 79TH ST',
  '143A',
  'WEAPONS VIOLATION',
  'UNLAWFUL POSS OF HANDGUN',
  'STREET',
  'true',
  'false',
  '0624',
  '006',
  '8',
  '44',
  '15',
  '1184626',
  '1852799',
  '2015',
  '05/26/2015 12:42:06 PM',
  '41.751242944',
  '-87.599004724',
  '"(41.751242944',
  ' -87.599004724)"']]

> **flatMap (transformation)**

Each element of row will be one output

Each row will be mapped to one output 

* element1 in row1 -> output1
* element2 in row1 -> output2
* element3 in row1 -> output3
...

In [12]:
myRDD2.flatMap(lambda x: x.split(",")).take(2)

['ID', 'Case Number']

In [13]:
myRDD2.flatMap(lambda x: x.split(",")).map(lambda x: (x,1)).take(2)

[('ID', 1), ('Case Number', 1)]

> **reduce (action)**

In reduce function, one is current element and another is running sum

In [14]:
myRDD.reduce(lambda x,y: x+y)

15

---

# Spark DataFrame API

> **Read csv file into Spark DataFrame**

In [51]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [162]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data/Crimes_-_2001_to_present.csv')

[databricks/spark-csv](https://github.com/databricks/spark-csv)

In [164]:
# the actual #rows is 
df.count()

5801844

> **Create DataFrame from Scratch**

In [91]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
scratch_df = sqlContext.createDataFrame([(1, "A", [1,2,3]), (2, "B", [3,5])],["col1", "col2", "col3"])

In [92]:
scratch_df.show()

+----+----+---------+
|col1|col2|     col3|
+----+----+---------+
|   1|   A|[1, 2, 3]|
|   2|   B|   [3, 5]|
+----+----+---------+



> **Read Json into DataFrame**

In [97]:
customer = sqlContext.read.json("data/customer.json")

In [99]:
customer.take(3)

[Row(address=Row(city='New Orleans', state='LA', street='6649 N Blue Gum St', zip='70116'), first_name='James', last_name='Butterburg'),
 Row(address=Row(city='Brighton', state='MI', street='4 B Blue Ridge Blvd', zip='48116'), first_name='Josephine', last_name='Darakjy'),
 Row(address=Row(city='Bridgeport', state='NJ', street='8 W Cerritos Ave #54', zip='08014'), first_name='Art', last_name='Chemel')]

> **show records, head**

In [53]:
df.show(3)

+--------+-----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|              Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|10078659|   HY267429|05/19/2015 11:57:...|    010XX E 79TH ST|143A|   WEAPONS VIOLATION|UNLAWFUL POSS OF ...|              STR

In [54]:
df.head(3)

[Row(ID=10078659, Case Number='HY267429', Date='05/19/2015 11:57:00 PM', Block='010XX E 79TH ST', IUCR='143A', Primary Type='WEAPONS VIOLATION', Description='UNLAWFUL POSS OF HANDGUN', Location Description='STREET', Arrest=True, Domestic=False, Beat=624, District=6, Ward=8, Community Area='44', FBI Code='15', X Coordinate=1184626, Y Coordinate=1852799, Year=2015, Updated On='05/26/2015 12:42:06 PM', Latitude=41.751242944, Longitude=-87.599004724, Location='(41.751242944, -87.599004724)'),
 Row(ID=10078598, Case Number='HY267408', Date='05/19/2015 11:50:00 PM', Block='067XX N SHERIDAN RD', IUCR='3731', Primary Type='INTERFERENCE WITH PUBLIC OFFICER', Description='OBSTRUCTING IDENTIFICATION', Location Description='STREET', Arrest=True, Domestic=False, Beat=2432, District=24, Ward=49, Community Area='1', FBI Code='24', X Coordinate=1167071, Y Coordinate=1944859, Year=2015, Updated On='05/26/2015 12:42:06 PM', Latitude=42.004255918, Longitude=-87.660691083, Location='(42.004255918, -87.660

> **take the first 1000 rows of a Spark Dataframe and return a new dataframe**

[Stackoverflow](https://stackoverflow.com/questions/34206508/is-there-a-way-to-take-the-first-1000-rows-of-a-spark-dataframe)

In [77]:
df2 = df.limit(10).alias("test").persist()

In [80]:
df2.count()

10

In [82]:
df2.show(5)

+--------+-----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|               Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|10078659|   HY267429|05/19/2015 11:57:...|     010XX E 79TH ST|143A|   WEAPONS VIOLATION|UNLAWFUL POSS OF ...|             

> **Get Column Types**

In [144]:
df2

DataFrame[ID: int, Case Number: string, Date: string, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: boolean, Domestic: boolean, Beat: int, District: int, Ward: int, Community Area: string, FBI Code: string, X Coordinate: int, Y Coordinate: int, Year: int, Updated On: string, Latitude: double, Longitude: double, Location: string]

In [86]:
df2.dtypes

[('ID', 'int'),
 ('Case Number', 'string'),
 ('Date', 'string'),
 ('Block', 'string'),
 ('IUCR', 'string'),
 ('Primary Type', 'string'),
 ('Description', 'string'),
 ('Location Description', 'string'),
 ('Arrest', 'boolean'),
 ('Domestic', 'boolean'),
 ('Beat', 'int'),
 ('District', 'int'),
 ('Ward', 'int'),
 ('Community Area', 'string'),
 ('FBI Code', 'string'),
 ('X Coordinate', 'int'),
 ('Y Coordinate', 'int'),
 ('Year', 'int'),
 ('Updated On', 'string'),
 ('Latitude', 'double'),
 ('Longitude', 'double'),
 ('Location', 'string')]

> **Get the shape of dataframe**

**Number of Rows**

In [41]:
df.count()

5801844

**Number of Columns**

In [None]:
len(df.columns)

> **Describe the dataset; Get summary statistics**

In [43]:
df.describe().show()

+-------+------------------+------------------+--------------------+--------------------+-----------------+-----------------+---------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+-------------------+--------------------+
|summary|                ID|       Case Number|                Date|               Block|             IUCR|     Primary Type|    Description|Location Description|              Beat|          District|              Ward|    Community Area|          FBI Code|      X Coordinate|      Y Coordinate|              Year|          Updated On|           Latitude|          Longitude|            Location|
+-------+------------------+------------------+--------------------+--------------------+-----------------+-----------------+---------------+--------------------+------------------+------------------+------

> **Select Column**

In [44]:
df.select('ID','Date').show(5)

+--------+--------------------+
|      ID|                Date|
+--------+--------------------+
|10078659|05/19/2015 11:57:...|
|10078598|05/19/2015 11:50:...|
|10078625|05/19/2015 11:47:...|
|10078662|05/19/2015 11:46:...|
|10078584|05/19/2015 11:45:...|
+--------+--------------------+
only showing top 5 rows



> **Get unique value**

In [45]:
df.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location']

In [48]:
df.select('District').distinct().show()

+--------+
|District|
+--------+
|      31|
|      12|
|      22|
|    null|
|       1|
|      13|
|       6|
|      16|
|       3|
|      20|
|       5|
|      19|
|      15|
|       9|
|      17|
|       4|
|       8|
|      23|
|       7|
|      10|
+--------+
only showing top 20 rows



In [46]:
df.select('District').distinct().count()

27

> **Explode array data into rows in spark; Dividing complex rows of dataframe to simple rows in Pyspark; Explode**

The default into of explode method should be a list [1,2,3]. If the value in the row is not in list format, we need to specify how do we want to split the value. For exmaple, if the format is a string "1,2,3". We need to write it as `explode(split(df.col3, ","))`

* **First way**: use **withColumn**+**explode**
    * Note: if we want to add a new column, use this.

[StackOverFlow: Explode array data into rows in spark](https://stackoverflow.com/questions/44436856/explode-array-data-into-rows-in-spark)

* **Second way**: use **select**+**explode**+**alias**
    * select
    * explode: to separate 
    * alias: to specify the name of new column
    * Note: if we want to select and add a new column at the same time.

[StackOverFlow: Explode in PySpark](https://stackoverflow.com/questions/38210507/explode-in-pyspark)

In [88]:
from pyspark.sql.functions import explode
explode_df = sqlContext.createDataFrame([(1, "A", [1,2,3]), (2, "B", [3,5])],["col1", "col2", "col3"])
explode_df.show()

+----+----+---------+
|col1|col2|     col3|
+----+----+---------+
|   1|   A|[1, 2, 3]|
|   2|   B|   [3, 5]|
+----+----+---------+



In [194]:
# cover out the original col3
explode_df.withColumn("col3", explode(explode_df.col3)).show()
# create a new column
explode_df.withColumn("newcol3", explode(explode_df.col3)).show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   1|
|   1|   A|   2|
|   1|   A|   3|
|   2|   B|   3|
|   2|   B|   5|
+----+----+----+

+----+----+---------+-------+
|col1|col2|     col3|newcol3|
+----+----+---------+-------+
|   1|   A|[1, 2, 3]|      1|
|   1|   A|[1, 2, 3]|      2|
|   1|   A|[1, 2, 3]|      3|
|   2|   B|   [3, 5]|      3|
|   2|   B|   [3, 5]|      5|
+----+----+---------+-------+



In [112]:
customer.select("first_name",
                explode("address").alias("contactInfo")
                "address.city").show()

+----------+-----------+
|first_name|       city|
+----------+-----------+
|     James|New Orleans|
| Josephine|   Brighton|
|       Art| Bridgeport|
+----------+-----------+



In [125]:
explode_df.select("col1",
                  "col2",
                  explode(explode_df.col3).alias("newcol3")).show()

+----+----+-------+
|col1|col2|newcol3|
+----+----+-------+
|   1|   A|      1|
|   1|   A|      2|
|   1|   A|      3|
|   2|   B|      3|
|   2|   B|      5|
+----+----+-------+



**Examples of non-list format column value**

In [115]:
explode_df2 = sqlContext.createDataFrame([(1, "A", "1,2,3"), (2, "B", "3,5")],["col1", "col2", "col3"])
explode_df2.show()

+----+----+-----+
|col1|col2| col3|
+----+----+-----+
|   1|   A|1,2,3|
|   2|   B|  3,5|
+----+----+-----+



In [127]:
explode_df2.withColumn("newcol3", explode(split(explode_df2.col3, ","))).show()

explode_df2.select("col1",
                   "col2",
                   explode(split(explode_df2.col3, ",")).alias("newcol3")).show()

+----+----+-----+-------+
|col1|col2| col3|newcol3|
+----+----+-----+-------+
|   1|   A|1,2,3|      1|
|   1|   A|1,2,3|      2|
|   1|   A|1,2,3|      3|
|   2|   B|  3,5|      3|
|   2|   B|  3,5|      5|
+----+----+-----+-------+

+----+----+-------+
|col1|col2|newcol3|
+----+----+-------+
|   1|   A|      1|
|   1|   A|      2|
|   1|   A|      3|
|   2|   B|      3|
|   2|   B|      5|
+----+----+-------+



> **Convert pyspark string to date format**

[StackOverFlow: Convert pyspark string to date format](https://stackoverflow.com/questions/38080748/convert-pyspark-string-to-date-format)

In [129]:
df2.take(2)

[Row(ID=10078659, Case Number='HY267429', Date='05/19/2015 11:57:00 PM', Block='010XX E 79TH ST', IUCR='143A', Primary Type='WEAPONS VIOLATION', Description='UNLAWFUL POSS OF HANDGUN', Location Description='STREET', Arrest=True, Domestic=False, Beat=624, District=6, Ward=8, Community Area='44', FBI Code='15', X Coordinate=1184626, Y Coordinate=1852799, Year=2015, Updated On='05/26/2015 12:42:06 PM', Latitude=41.751242944, Longitude=-87.599004724, Location='(41.751242944, -87.599004724)'),
 Row(ID=10078598, Case Number='HY267408', Date='05/19/2015 11:50:00 PM', Block='067XX N SHERIDAN RD', IUCR='3731', Primary Type='INTERFERENCE WITH PUBLIC OFFICER', Description='OBSTRUCTING IDENTIFICATION', Location Description='STREET', Arrest=True, Domestic=False, Beat=2432, District=24, Ward=49, Community Area='1', FBI Code='24', X Coordinate=1167071, Y Coordinate=1944859, Year=2015, Updated On='05/26/2015 12:42:06 PM', Latitude=42.004255918, Longitude=-87.660691083, Location='(42.004255918, -87.660

In [133]:
from pyspark.sql.functions import to_timestamp

In [138]:
df2.select("ID",
           to_timestamp(df2.Date, 'MM/dd/yyyy HH:mm:ss').alias('dt')).show()

+--------+-------------------+
|      ID|                 dt|
+--------+-------------------+
|10078659|2015-05-19 11:57:00|
|10078598|2015-05-19 11:50:00|
|10078625|2015-05-19 11:47:00|
|10078662|2015-05-19 11:46:00|
|10078584|2015-05-19 11:45:00|
|10078629|2015-05-19 11:40:00|
|10079225|2015-05-19 11:30:00|
|10078594|2015-05-19 11:30:00|
|10080768|2015-05-19 11:30:00|
|10078618|2015-05-19 11:30:00|
+--------+-------------------+



In [141]:
df3 = df2.withColumn("datetime",
           to_timestamp(df2.Date, 'MM/dd/yyyy HH:mm:ss'))

In [142]:
df3.show()

+--------+-----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+-------------------+
|      ID|Case Number|                Date|               Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|           datetime|
+--------+-----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+-------------------+
|10078659|   HY267429|05/19/2015 11:57:...|     010XX E 79TH ST|

> **Spark DataFrame TimestampType - how to get Year, Month, Day values from field?**

* [StackOverFlow: year/month/dayofmonth](https://stackoverflow.com/questions/30949202/spark-dataframe-timestamptype-how-to-get-year-month-day-values-from-field)
* [StackOverFlow: weekday](https://stackoverflow.com/questions/38928919/how-to-get-the-weekday-from-day-of-month-using-pyspark/38931967)
* [pyspark sql function](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html)

In [192]:
# get year/month/day of month
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, to_date
df4 = df3.withColumn("year", year(df3.datetime))
df4 = df4.withColumn("month", month(df4.datetime))
df4 = df4.withColumn("dayofmonth", dayofmonth(df4.datetime))
df4 = df4.withColumn("hour", hour(df4.datetime))
df4 = df4.withColumn("dayofmonth", dayofmonth(df4.datetime))
df4 = df4.withColumn("minute", minute(df4.datetime))                                        
df4 = df4.withColumn("todate", to_date(df4.datetime))   

In [193]:
df4.take(3)

[Row(ID=10078659, Case Number='HY267429', Date='05/19/2015 11:57:00 PM', Block='010XX E 79TH ST', IUCR='143A', Primary Type='WEAPONS VIOLATION', Description='UNLAWFUL POSS OF HANDGUN', Location Description='STREET', Arrest=True, Domestic=False, Beat=624, District=6, Ward=8, Community Area='44', FBI Code='15', X Coordinate=1184626, Y Coordinate=1852799, year=2015, Updated On='05/26/2015 12:42:06 PM', Latitude=41.751242944, Longitude=-87.599004724, Location='(41.751242944, -87.599004724)', datetime=datetime.datetime(2015, 5, 19, 11, 57), month=5, dayofmonth=19, hour=11, minute=57, todate=datetime.date(2015, 5, 19)),
 Row(ID=10078598, Case Number='HY267408', Date='05/19/2015 11:50:00 PM', Block='067XX N SHERIDAN RD', IUCR='3731', Primary Type='INTERFERENCE WITH PUBLIC OFFICER', Description='OBSTRUCTING IDENTIFICATION', Location Description='STREET', Arrest=True, Domestic=False, Beat=2432, District=24, Ward=49, Community Area='1', FBI Code='24', X Coordinate=1167071, Y Coordinate=1944859, 

In [190]:
# get weekday string /number
from pyspark.sql.functions import date_format
df4.select('datetime', date_format('datetime', 'u').alias('dow_number'), date_format('datetime', 'E').alias('dow_string')).show()


+-------------------+----------+----------+
|           datetime|dow_number|dow_string|
+-------------------+----------+----------+
|2015-05-19 11:57:00|         2|       Tue|
|2015-05-19 11:50:00|         2|       Tue|
|2015-05-19 11:47:00|         2|       Tue|
|2015-05-19 11:46:00|         2|       Tue|
|2015-05-19 11:45:00|         2|       Tue|
|2015-05-19 11:40:00|         2|       Tue|
|2015-05-19 11:30:00|         2|       Tue|
|2015-05-19 11:30:00|         2|       Tue|
|2015-05-19 11:30:00|         2|       Tue|
|2015-05-19 11:30:00|         2|       Tue|
+-------------------+----------+----------+



In [191]:
df4.withColumn("dow_string", date_format(df4.datetime, 'E')).withColumn("dow_number", date_format(df4.datetime, 'u')).show()

+--------+-----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+-------------------+-----+----------+----------+----------+
|      ID|Case Number|                Date|               Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|year|          Updated On|    Latitude|    Longitude|            Location|           datetime|month|dayofmonth|dow_string|dow_number|
+--------+-----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+-------

> **calculate pair wise frequency of categorical columns**

In [180]:
df5 = df4.crosstab('Beat', 'District')
df5.show()

+-------------+---+---+---+---+---+---+---+---+---+---+
|Beat_District| 10| 11| 14| 24|  3|  4|  6|  7|  8|  9|
+-------------+---+---+---+---+---+---+---+---+---+---+
|         2432|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|          421|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|
|         1111|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|
|          725|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|
|         1434|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|
|         1011|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|          314|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|
|          813|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|
|          624|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|
|          935|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|
+-------------+---+---+---+---+---+---+---+---+---+---+



> **Drop rows with NA**

In [161]:
df4.dropna().count()

10

> **Change Column Type**

[change a Dataframe column from String type to Double type in pyspark](https://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark)

In [183]:
df5

DataFrame[Beat_District: string, 10: bigint, 11: bigint, 14: bigint, 24: bigint, 3: bigint, 4: bigint, 6: bigint, 7: bigint, 8: bigint, 9: bigint]

In [184]:
df5.withColumn("Beat_District", df5["Beat_District"].cast("double"))

DataFrame[Beat_District: double, 10: bigint, 11: bigint, 14: bigint, 24: bigint, 3: bigint, 4: bigint, 6: bigint, 7: bigint, 8: bigint, 9: bigint]

> **GroupBy**

Syntax is similar to pandas

In [None]:
train.groupby('Age').agg({'Purchase': 'mean'}).show()

> **Sort**

In [None]:
df.sort(df.age.desc()).collect()

> **Get the NULL/NaN percentage**

[StackOverFlow](https://stackoverflow.com/questions/33900726/count-number-of-non-nan-entries-in-each-column-of-spark-dataframe-with-pyspark?noredirect=1&lq=1)

In [None]:
from pyspark.sql.functions import col, count, isnan, lit, sum

def count_not_null(c, nan_as_null=False):
    """Use conversion between boolean and integer
    - False -> 0
    - True ->  1
    """
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

In [None]:
# get number of not null values in each column
df.agg(*[count_not_null(c) for c in df.columns]).show()

In [None]:
# get percentage of not null values in each column
exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns]
df.agg(*exprs).show()


> **Filter NULL rows**

In [None]:
df.where(col("month").isNull())

> **Filter**

In [None]:
df.filter(df['age']>24).show()

> **Replace values in a certain column**

In [None]:
# replace AM, PM with space in Date column
df.withColumn('Date', regexp_replace('Date', ' PM', '')).withColumn('Date', regexp_replace('Date', ' AM', ''))

---

# Spark SQL

Since sparkSQL work with dataframe api, we need to firstly transform our file/RDD into spark dataframe
* RDD -> dataframe
* file -> dataframe

> **Transform RDD into DataFrame**

In [196]:
myRDD2.take(5)

['ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location',
 '10078659,HY267429,05/19/2015 11:57:00 PM,010XX E 79TH ST,143A,WEAPONS VIOLATION,UNLAWFUL POSS OF HANDGUN,STREET,true,false,0624,006,8,44,15,1184626,1852799,2015,05/26/2015 12:42:06 PM,41.751242944,-87.599004724,"(41.751242944, -87.599004724)"',
 '10078598,HY267408,05/19/2015 11:50:00 PM,067XX N SHERIDAN RD,3731,INTERFERENCE WITH PUBLIC OFFICER,OBSTRUCTING IDENTIFICATION,STREET,true,false,2432,024,49,1,24,1167071,1944859,2015,05/26/2015 12:42:06 PM,42.004255918,-87.660691083,"(42.004255918, -87.660691083)"',
 '10078625,HY267417,05/19/2015 11:47:00 PM,026XX E 77TH ST,2170,NARCOTICS,POSSESSION OF DRUG EQUIPMENT,STREET,true,false,0421,004,7,43,18,1195299,1854463,2015,05/26/2015 12:42:06 PM,41.755552462,-87.559839339,"(41.755552462, -87.559839339)"',
 '10078662,HY267423,05/19/2015 1

In [197]:
myRDD3 = myRDD2.map(lambda x: x.split(","))

In [200]:
myRDD3.take(2)

[['ID',
  'Case Number',
  'Date',
  'Block',
  'IUCR',
  'Primary Type',
  'Description',
  'Location Description',
  'Arrest',
  'Domestic',
  'Beat',
  'District',
  'Ward',
  'Community Area',
  'FBI Code',
  'X Coordinate',
  'Y Coordinate',
  'Year',
  'Updated On',
  'Latitude',
  'Longitude',
  'Location'],
 ['10078659',
  'HY267429',
  '05/19/2015 11:57:00 PM',
  '010XX E 79TH ST',
  '143A',
  'WEAPONS VIOLATION',
  'UNLAWFUL POSS OF HANDGUN',
  'STREET',
  'true',
  'false',
  '0624',
  '006',
  '8',
  '44',
  '15',
  '1184626',
  '1852799',
  '2015',
  '05/26/2015 12:42:06 PM',
  '41.751242944',
  '-87.599004724',
  '"(41.751242944',
  ' -87.599004724)"']]

In [201]:
myRDD3.toDF()

DataFrame[_1: string, _2: string, _3: string, _4: string, _5: string, _6: string, _7: string, _8: string, _9: string, _10: string, _11: string, _12: string, _13: string, _14: string, _15: string, _16: string, _17: string, _18: string, _19: string, _20: string, _21: string, _22: string]

In [205]:
myRDD3.toDF(['ID','Case Number','Date','Block','IUCR','Primary Type','Description','Location Description','Arrest','Domestic','Beat','District','Ward','Community Area','FBI Code','X Coordinate','Y Coordinate','Year','Updated On','Latitude','Longitude','Location'])

DataFrame[ID: string, Case Number: string, Date: string, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: string, Domestic: string, Beat: string, District: string, Ward: string, Community Area: string, FBI Code: string, X Coordinate: string, Y Coordinate: string, Year: string, Updated On: string, Latitude: string, Longitude: string, Location: string]

> **Write SQL code in SparkSQL**

In [None]:
# to write SQL code on SparkSQL
df.registerTempTable("mydf")

In [None]:
# it will return a dataframe not a sql tempTable
sqlContect.sql("select * from mydf").show()

# Machine Learning Models in PySpark

[Basic Statistics - RDD-based API](https://spark.apache.org/docs/2.2.0/mllib-statistics.html)

Whenever we want to use Mllib, we need to convert our data into a specific format

In [171]:
from pyspark.mllib.linalg import Vectors

In [172]:
rdd = sc.parallelize(['0,1,2,3','1,4,3,4'])

In [178]:
rdd.collect()

['0,1,2,3', '1,4,3,4']

In [175]:
def parsePoint(line):
    values = [float(x) for x in line.split(",")]
    return Vectors.dense(values)

In [176]:
rdd2 = rdd.map(parsePoint)

In [177]:
rdd2.collect()

[DenseVector([0.0, 1.0, 2.0, 3.0]), DenseVector([1.0, 4.0, 3.0, 4.0])]

Then we can use pretty much anything provided in mllib

In [37]:
from pyspark.mllib.stat import Statistics

In [41]:
summary = Statistics.colStats(rdd2)
print(summary.mean())  # a dense vector containing the mean value for each column
print(summary.variance())  # column-wise variance
print(summary.numNonzeros())  # number of nonzeros in each column

[ 0.5  2.5  2.5  3.5]
[ 0.5  4.5  0.5  0.5]
[ 1.  2.  2.  2.]


In [None]:
rdd2.collect()

# Reference

* [Complete Guide on DataFrame Operations in PySpark](https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/)
* [Introduction to DataFrames - Python](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html)