Before starting, make sure that this jupyter notebook was launched using the 'pySpark' command and not the 'jupyter notebook' command.

This notebook is for Python 2.7 - The only difference for python 3.X is that the print statements need () 

The 'data' folder and this notebook must have same parent directory.

----------------------------------------------------------------------------------------------------------------------

Install Pyspark with Jupyter on MacBook:
Here is a fail proof way to install pySpark on Mac
```
brew install jupyter
brew install scala
brew install apache-spark
brew cask install caskroom/versions/java8
```
Add to your bash_profile the environment vars. Run following commands:
```
sudo nano ~./bash_profile
```
Add to file:

```
if which pyspark > /dev/null; then
  export SPARK_HOME="/usr/local/Cellar/apache-spark/2.3.1/libexec/"
  export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$PYTHONPATH
  export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
fi

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=notebook
```

cntrl+X key lets you exit.
Select Y for saving

Compile the script by
```
source ~/.bash_profile
```
And thats all!
Run pySpark by typing

```
pySpark
```

<b>What is Apache Spark?</b>

A 'distributed' computing engine. 
Analogy: A team working on a big project.

Apache spark has 'master node' and 'slave nodes' which together form the cluster
Analogy: A team has a leader and multiple members

The Spark 'Job' is submitted to the Cluster's master
A project is assigned to a team leader

The master then distributes (assigns) the work between the slaves and awaits the response(result)
Its a non-agile team :P



<br><br>
Diagrams we often see
<img src="assetsPySparkTut/2.jpg">
<img style="height:400px" src="assetsPySparkTut/7.png">
<img style="height:400px" src="assetsPySparkTut/8.jpg">
<img src="assetsPySparkTut/1.png">

<b>What actually happens?</b>

1. The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications.

2. As you enter your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph.

3. When the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler divides operator graph into (map and reduce) stages.

4. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map operators can be scheduled in a single stage. This optimization is key to Sparks performance. The final result of a DAG scheduler is a set of stages.

5. The stages are passed on to the Task Scheduler. The task scheduler launches tasks via cluster manager. (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies among stages.

6. The Worker executes the tasks. A new JVM is started per job. The worker knows only about the code that is passed to it.

----------------------------------------------------------------------------------------------------------------------
<b>What happens (Simpler version):</b>

A. Coding interface:
    1. We write code (RDD, Dataframes, etc)
B. Code interpretation (Spark Engine):
    1. Code is converted into 'DAG - Directed Acyclic graphs' 
    2. DAG converted to tasks (Highly optimized) -> Imagine this as the 'project execution plan' for the team analogy
C. Scheduling and task distribution
    1. Uses DAG generated task schema to distribute the work load
D. Task execution
    1. Slaves execute the tasks and report back to master
E. Results compilation
    1. Master stiches the results together

<b>Where does all of this happen?</b>

<b>A.</b> Coding interface
<img style="height:200px;" src="assetsPySparkTut/3.jpg">
<img style="height:200px;" src="assetsPySparkTut/4.jpg">

We use Zeppelin in our production environment.
 Key differences:<br>
1. Every Notebook cell starts with declaration of 'Intrepretor type' -> %python, %pyspark, %sql
2. You can use different intrepretors for different cells
3. SQL intrepretor has inbuilt visualization tool
4. Use Spark (%pyspark), save to 'Hive' (Distributed SQL) and then use %sql to load and visualize data!


<img style="height:500px;" src="assetsPySparkTut/6.png">

<b>B.</b> Code interpretation (Spark Engine),
<b>C.</b> Scheduling and task distribution
Both happen on the master node of Spark

<b>D.</b> Task execution
On individual slaves (They operate independently, unaware of each other)
<b>E.</b> Results compilation
    1. Master stiches the results together and reports them to users
   

<b>Why is this powerful?</b>

Consider example of 'JOIN' query after series of selects

Explained on white board

<b>Infrastructure we use</b>

Explained on whiteboard


<b>Memory management - Where is data stored after every step?</b><br>
Explained with diagram on white board  (RAM+Disk)

<img style="height:400px;" src="assetsPySparkTut/5.png">




<b>Data structures in Spark</b>
'RDD' Resilient Distributed Dataset is the building block<br>
'Dataframe' Similar to Numpy dataframes<br>
'Dataset' similar to python dictionaries<br>
<br>
We care ONLY about Dataframes. Dataframes (dfs) are like SQL tables. We can run queries on these dfs just like SQL.<br>
<b>Treat Dataframe+Spark+Python as a fancy new SQL tool</b>
<br>
Ignore all the complex backend, distributed stuff and scalability. Data scientists/Analysts should worry about data and not the system/infrastructure.

If you are good at Python and SQL, you can easily become Spark expert!

<b>Lets start coding!</b><br>
Things to remember -><br> 
1. Dataframe is like SQL table
2. Spark can connect with ANY datasource (input/output)
3. Remember the power available is tremendous, but optimize where ever possible (Its costly!)

<br><br><br><br>

Spark is Scala based system. All python code is converted to Scala and then run on the cluster.<br>
Using notebooks hides many elements of Spark code from us. <br>
Spark context (constructor for Spark) is variable 'sc'
Spark SQL context (constructor for Spark SQL engine) is varialble 'sqlContext'

<b>Lets look at the Data architecture</b><br>
Heirarchy in data:
Growers ->
Farms ->
Fields

In [164]:
#Loading data

#sqlContext.read.{fileType}({fileLocation})
#Creates Dataframe out of our data!

fields = sqlContext.read.json("dummyData/landing/field/*.json")
#fields = sqlContext.read.json("s3n://data-lake-us-east-2-549323063936-validated/NonExploded_New_data_field.json/*.json")


In [165]:
#Read growers and farms here

#TODO
growers = sqlContext.read.json("dummyData/landing/grower/*.json")
farms = sqlContext.read.json("dummyData/landing/farm/*.json")

In [166]:
#Printing Schemas of data
fields.printSchema()

root
 |-- field: struct (nullable = true)
 |    |-- associations: struct (nullable = true)
 |    |    |-- agrian.farm: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- agrian.field: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- agrian.grower: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- deleted_at: string (nullable = true)
 |    |-- id: string (nullable = true)



In [11]:
# What does this schema mean?
# |    field      |
# | ______________|
# |associations|..|
# |            |  |
# |            |  |
# |            |  |

In [79]:
#TODO
#Print the schemas for grower and farms
growers.printSchema()
farms.printSchema()

root
 |-- grower: struct (nullable = true)
 |    |-- associations: struct (nullable = true)
 |    |    |-- agrian.grower: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- data: struct (nullable = true)
 |    |    |-- data1: struct (nullable = true)
 |    |    |    |-- subData1: long (nullable = true)
 |    |    |    |-- subData2: long (nullable = true)
 |    |    |-- data2: string (nullable = true)
 |    |-- deleted_at: string (nullable = true)
 |    |-- id: string (nullable = true)

root
 |-- farm: struct (nullable = true)
 |    |-- associations: struct (nullable = true)
 |    |    |-- agrian.farm: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- agrian.grower: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- deleted_at: string (nullable = true)
 |    |-- id: string (nullable = true)



In [167]:
#SELECT queries

fields.select('field')
#Returns a new dataframe. fields df remains unchanged

DataFrame[field: struct<associations:struct<agrian.farm:array<string>,agrian.field:array<string>,agrian.grower:array<string>>,deleted_at:string,id:string>]

In [168]:
print fields.field
#Returns a Column dataStructure! Avoid converting to Columns as columns have limited use


Column<field>


In [169]:
fields.show()
fields.select('field').show()
#Prints the first 20 rows of the dataframe

+--------------------+
|               field|
+--------------------+
|[[[farm1], [field...|
|[[[farm6], [field...|
|[[[farm5], [field...|
|[[[farm3], [field...|
|[[[farm3], [field...|
|[[[farm4], [field...|
|[[[farm7], [field...|
|[[[farm2], [field...|
|[[[farm1], [field...|
|[[[farm6], [field...|
+--------------------+

+--------------------+
|               field|
+--------------------+
|[[[farm1], [field...|
|[[[farm6], [field...|
|[[[farm5], [field...|
|[[[farm3], [field...|
|[[[farm3], [field...|
|[[[farm4], [field...|
|[[[farm7], [field...|
|[[[farm2], [field...|
|[[[farm1], [field...|
|[[[farm6], [field...|
+--------------------+



In [41]:
fields.select('field.*').show()
#Notice the difference in what happened here!

+--------------------+----------+-------+
|        associations|deleted_at|     id|
+--------------------+----------+-------+
|[[farm1], [field9...|2018-01-21|field10|
|[[farm6], [field6...|      null| field6|
|[[farm5], [field7...|      null| field7|
|[[farm3], [field1...|      null| field1|
|[[farm3], [field2...|      null| field2|
|[[farm4], [field3...|      null| field3|
|[[farm7], [field4...|      null| field4|
|[[farm2], [field8...|      null| field8|
|[[farm1], [field9...|      null| field9|
|[[farm6], [field5...|      null| field5|
+--------------------+----------+-------+



In [53]:
from pyspark.sql.functions import col
fields.filter(col("field.deleted_at").isNotNull())
#Notice that it returns dataframe

DataFrame[field: struct<associations:struct<agrian.farm:array<string>,agrian.field:array<string>,agrian.grower:array<string>>,deleted_at:string,id:string>]

In [54]:
fields.filter(col("field.deleted_at").isNotNull()).show()

+--------------------+
|               field|
+--------------------+
|[[[farm1], [field...|
+--------------------+



In [170]:
fields.where(col("field.deleted_at").isNotNull()).show()
#Where clause works as well!

+--------------------+
|               field|
+--------------------+
|[[[farm1], [field...|
+--------------------+



In [135]:
fields.where("fields.deleted_at != 'null'").show()
#Another way of using Where clause

+------------+----------+---+
|associations|deleted_at| id|
+------------+----------+---+
+------------+----------+---+



In [64]:
fields['field']

#Returns Col object

Column<field>

In [62]:
fields.filter(fields['field.deleted_at'].isNotNull()).show()
#Another way of doing it right!

+--------------------+
|               field|
+--------------------+
|[[[farm1], [field...|
+--------------------+



In [83]:
#TODO
#Filter Fields that are NOT deleted. And then Remove the nested 'field' column. 
#Overwrite the dataframe fields with the new result

# fields = fields.select('field.*').filter(col("deleted_at").isNotNull())
fields = fields.filter(col("field.deleted_at").isNull()).select('field.*')
fields.show()
#Make sure the .show() is removed!

+--------------------+----------+------+
|        associations|deleted_at|    id|
+--------------------+----------+------+
|[[farm6], [field6...|      null|field6|
|[[farm5], [field7...|      null|field7|
|[[farm3], [field1...|      null|field1|
|[[farm3], [field2...|      null|field2|
|[[farm4], [field3...|      null|field3|
|[[farm7], [field4...|      null|field4|
|[[farm2], [field8...|      null|field8|
|[[farm1], [field9...|      null|field9|
|[[farm6], [field5...|      null|field5|
+--------------------+----------+------+



In [82]:
#TODO 
#Apply the same transformation for Growers and farms

farms = farms.filter(col("farm.deleted_at").isNull()).select('farm.*')
growers = growers.filter(col("grower.deleted_at").isNull()).select('grower.*')
farms.show()
growers.show()


+--------------------+----------+-----+
|        associations|deleted_at|   id|
+--------------------+----------+-----+
|[[farm1], [grower1]]|      null|farm1|
|[[farm6], [grower2]]|      null|farm6|
|[[farm7], [grower3]]|      null|farm7|
|[[farm4], [grower2]]|      null|farm4|
|[[farm5], [grower2]]|      null|farm5|
|[[farm2], [grower4]]|      null|farm2|
|[[farm3], [grower2]]|      null|farm3|
+--------------------+----------+-----+

+--------------------+--------------------+----------+-------+
|        associations|                data|deleted_at|     id|
+--------------------+--------------------+----------+-------+
|[[grower1, grower2]]|[[1, 2], No Data ...|      null|grower6|
|         [[grower1]]|                null|      null|grower1|
|         [[grower4]]|                null|      null|grower4|
|         [[grower5]]|                null|      null|grower5|
|         [[grower2]]|                null|      null|grower2|
|         [[grower3]]|                null|      null|g

In [84]:
#At this point, farms-fields-growers are all in memory

In [88]:
#Lets look at renaming columns
#Use withColumnRenamed(Old, New)

fields.withColumnRenamed('id','grower_id_temp').show()

fields.show()
#Notice that fields remains unchanged as we dint overwrite the Df

+--------------------+----------+--------------+
|        associations|deleted_at|grower_id_temp|
+--------------------+----------+--------------+
|[[farm6], [field6...|      null|        field6|
|[[farm5], [field7...|      null|        field7|
|[[farm3], [field1...|      null|        field1|
|[[farm3], [field2...|      null|        field2|
|[[farm4], [field3...|      null|        field3|
|[[farm7], [field4...|      null|        field4|
|[[farm2], [field8...|      null|        field8|
|[[farm1], [field9...|      null|        field9|
|[[farm6], [field5...|      null|        field5|
+--------------------+----------+--------------+

+--------------------+----------+------+
|        associations|deleted_at|    id|
+--------------------+----------+------+
|[[farm6], [field6...|      null|field6|
|[[farm5], [field7...|      null|field7|
|[[farm3], [field1...|      null|field1|
|[[farm3], [field2...|      null|field2|
|[[farm4], [field3...|      null|field3|
|[[farm7], [field4...|      null|f

In [97]:
fields.select('associations').printSchema()
#Select always returns Df. Converts nested columns to dfs using Select

root
 |-- associations: struct (nullable = true)
 |    |-- agrian.farm: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- agrian.field: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- agrian.grower: array (nullable = true)
 |    |    |-- element: string (containsNull = true)



In [104]:
fields.select('associations.agrian.farm')
#This wont work :(
#There is a '.' in the name of the attribute

In [106]:
fields.select('associations.`agrian.farm`').show()
#Use this instead!

+-----------+
|agrian.farm|
+-----------+
|    [farm6]|
|    [farm5]|
|    [farm3]|
|    [farm3]|
|    [farm4]|
|    [farm7]|
|    [farm2]|
|    [farm1]|
|    [farm6]|
+-----------+



In [116]:
#check type of columns
for name, dtype in fields.dtypes:
    print(name, dtype)


('associations', 'struct<agrian.farm:array<string>,agrian.field:array<string>,agrian.grower:array<string>>')
('deleted_at', 'string')
('id', 'string')


In [117]:
for name, dtype in fields.select('associations.`agrian.farm`').dtypes:
    print(name, dtype)

('agrian.farm', 'array<string>')


In [131]:
#Exploding Arrays in Df
import pyspark.sql.functions as pys
associations = fields.select('associations.*')
associations.show()
associations.withColumn("associationsExploded", pys.explode(associations['`agrian.farm`'])).show()

+-----------+------------+-------------+
|agrian.farm|agrian.field|agrian.grower|
+-----------+------------+-------------+
|    [farm6]|    [field6]|    [grower2]|
|    [farm5]|    [field7]|    [grower2]|
|    [farm3]|    [field1]|    [grower2]|
|    [farm3]|    [field2]|    [grower2]|
|    [farm4]|    [field3]|    [grower2]|
|    [farm7]|    [field4]|    [grower3]|
|    [farm2]|    [field8]|    [grower4]|
|    [farm1]|    [field9]|    [grower1]|
|    [farm6]|    [field5]|    [grower2]|
+-----------+------------+-------------+

+-----------+------------+-------------+--------------------+
|agrian.farm|agrian.field|agrian.grower|associationsExploded|
+-----------+------------+-------------+--------------------+
|    [farm6]|    [field6]|    [grower2]|               farm6|
|    [farm5]|    [field7]|    [grower2]|               farm5|
|    [farm3]|    [field1]|    [grower2]|               farm3|
|    [farm3]|    [field2]|    [grower2]|               farm3|
|    [farm4]|    [field3]|    [g

In [None]:
#Joins in pyspark
#df1.join(df2, df1.ColumnName== df2.columnName, typeOfJoin)

#Suppose we have 2 dataframes with 'id1' for df1 and 'id2' for df2. 
#We want to left outer join df1 over df2 dfs over 1d1 and id2

#df1.join(df2, df1.id1 == df2.id2, 'left_outer')

In [147]:
#We often need to rename columns to avoid 2 columns with same names during joins
fieldsToJoin = fields.withColumnRenamed('id', 'field_id').withColumnRenamed('associations', 'field_associations')
fieldsToJoin.show()

+--------------------+----------+--------+
|  field_associations|deleted_at|field_id|
+--------------------+----------+--------+
|[[farm6], [field6...|      null|  field6|
|[[farm5], [field7...|      null|  field7|
|[[farm3], [field1...|      null|  field1|
|[[farm3], [field2...|      null|  field2|
|[[farm4], [field3...|      null|  field3|
|[[farm7], [field4...|      null|  field4|
|[[farm2], [field8...|      null|  field8|
|[[farm1], [field9...|      null|  field9|
|[[farm6], [field5...|      null|  field5|
+--------------------+----------+--------+



In [149]:
#TODO

#Join all fields with farms
#Use farm Id in associations of field ID as the join column

#Discussion if farm will be joined with field or field over farm

farms.join(fieldsToJoin, fieldsToJoin['field_associations.`agrian.farm`'][0]== farms.id, 'left_outer').show()

+--------------------+----------+-----+--------------------+----------+--------+
|        associations|deleted_at|   id|  field_associations|deleted_at|field_id|
+--------------------+----------+-----+--------------------+----------+--------+
|[[farm1], [grower1]]|      null|farm1|[[farm1], [field9...|      null|  field9|
|[[farm6], [grower2]]|      null|farm6|[[farm6], [field5...|      null|  field5|
|[[farm6], [grower2]]|      null|farm6|[[farm6], [field6...|      null|  field6|
|[[farm7], [grower3]]|      null|farm7|[[farm7], [field4...|      null|  field4|
|[[farm4], [grower2]]|      null|farm4|[[farm4], [field3...|      null|  field3|
|[[farm5], [grower2]]|      null|farm5|[[farm5], [field7...|      null|  field7|
|[[farm2], [grower4]]|      null|farm2|[[farm2], [field8...|      null|  field8|
|[[farm3], [grower2]]|      null|farm3|[[farm3], [field2...|      null|  field2|
|[[farm3], [grower2]]|      null|farm3|[[farm3], [field1...|      null|  field1|
+--------------------+------

In [171]:
#Saving dataframes as JSON
farms.write.json('dummyData/validated1')

In [172]:
#Combining into one JSON
farms.coalesce(1).write.json('dummyData/validated2')

In [162]:
#UDF User defined functions for transforming columns in Dfs
#Suppose we need to make complex transformations on columns in DF
#We use UDFs

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, StringType, FloatType, IntegerType, BooleanType

def isBoundaryPresent(columnElement):
    if not columnElement:
        return False
    try:
        columnElement = columnElement.strip()
        columnElement = unicode(str(columnElement), "utf-8")
        if columnElement == "MULTIPOLYGON EMPTY" or columnElement == None or columnElement == "null" or columnElement == "" or columnElement == " " or  columnElement == None:
            return False
            
        if columnElement[:5] == "MULTI":
            return True
            
        return False
    except:
        print "Error"
        return False
    
checkIfBoundaryPresent = udf(lambda z: isBoundaryPresent(z), BooleanType())


In [None]:
dfNew = df.withColumn('isBoundaryPresent', checkIfBoundaryPresent(df.boundary))