In [2]:
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark import SparkContext, SparkConf

#We can create a SparkConf() object and use it to initialize the spark context
conf = SparkConf().setAppName("Notebook 1").setMaster("local[4]") #Initialize spark context using 4 local cores as workers
sc = SparkContext(conf=conf)    

from pyspark.rdd import RDD

23/11/19 02:46:35 WARN Utils: Your hostname, abdou-Nitro-AN515-55 resolves to a loopback address: 127.0.1.1; using 10.188.49.204 instead (on interface wlp0s20f3)
23/11/19 02:46:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/19 02:46:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
def pretty_print_plan(rdd):
    for x in rdd.toDebugString().decode().split('\n'):
        print(x)

## The Moby Dick text

![MobyDick](https://m.media-amazon.com/images/I/91xMMAxMS1L._SL1500_.jpg)

And so begins an incredible odyssey that will take ship, sailors and readers to the edge of darkness. Moby Dick" is an epic tragedy of terrible dramatic power, the endless, desperate quest of the captain of a drunken ship on a voyage of no return.

### Words Count

In [3]:
%%time
text_file = sc.textFile("../data/mobydick.txt")
type(text_file)

CPU times: user 3.89 ms, sys: 1.03 ms, total: 4.92 ms
Wall time: 882 ms


pyspark.rdd.RDD

In [4]:
%%time
words = text_file.flatMap(lambda line: line.split(" "))

CPU times: user 1.17 ms, sys: 305 µs, total: 1.47 ms
Wall time: 25.6 ms


In [5]:
%%time
not_empty = words.filter(lambda x: x!="")

CPU times: user 80 µs, sys: 20 µs, total: 100 µs
Wall time: 104 µs


In [6]:
%%time
key_val = not_empty.map(lambda x: (x, 1))
counts= key_val.reduceByKey(lambda x1, x2: x1+x2)

CPU times: user 10.5 ms, sys: 0 ns, total: 10.5 ms
Wall time: 146 ms


In [7]:
%%time
## get the number of different words
diff_words = counts.count()
## get the number of words
nb_words = counts.map(lambda x: x[1]).reduce(lambda x, y: x+y)

print(f"number of different words = {diff_words} number of words = {nb_words} nb of occurence per word = {round(nb_words / diff_words, 2)}")

[Stage 0:>                                                          (0 + 2) / 2]

number of different words = 19840 number of words = 115314 nb of occurence per word = 5.81
CPU times: user 16.6 ms, sys: 985 µs, total: 17.6 ms
Wall time: 1.81 s


                                                                                

### Most common words

In [8]:
%%time
## Pythonic way
p_data = counts.collect()

CPU times: user 57.4 ms, sys: 7.36 ms, total: 64.7 ms
Wall time: 205 ms


In [9]:
%%time
p_data.sort(key=lambda x: x[1])
print("Most common words in Moby Dick:\n\n"+"\n".join(['%s:\t%d'%c for c in reversed(p_data[-5:])]))

Most common words in Moby Dick:

the:	6611
of:	3460
and:	2969
a:	2466
to:	2339
CPU times: user 4.33 ms, sys: 0 ns, total: 4.33 ms
Wall time: 4.25 ms


We can do the stuff on the head node using old python methods but... it doesn't scale if we have HUGE Data

---

We are gonna use PySpark RDDs to add this scalable feature

In [10]:
%%time
# spark way
reversed_counts = counts.map(lambda x: (x[1], x[0]))
sorted_counts = reversed_counts.sortByKey(ascending=False)

CPU times: user 17.8 ms, sys: 3.51 ms, total: 21.3 ms
Wall time: 301 ms


In [11]:
## Execution plan of the ops
pretty_print_plan(sorted_counts)

(2) PythonRDD[19] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[17] at mapPartitions at PythonRDD.scala:160 []
 |  ShuffledRDD[16] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[15] at sortByKey at <timed exec>:3 []
    |  PythonRDD[14] at sortByKey at <timed exec>:3 []
    |  MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:160 []
    |  ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:0 []
    +-(2) PairwiseRDD[3] at reduceByKey at <timed exec>:2 []
       |  PythonRDD[2] at reduceByKey at <timed exec>:2 []
       |  ../data/mobydick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
       |  ../data/mobydick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [12]:
%%time
## Now we come back to the head node bc we need the data now
first_10 = sorted_counts.take(10)
print("MOst 10 common words in Moby Dick:\n"+"\n".join([f"{k}\t:\t\t\t{v}" for (v, k) in first_10]))

MOst 10 common words in Moby Dick:
the	:			6611
of	:			3460
and	:			2969
a	:			2466
to	:			2339
in	:			1969
;	:			1949
that	:			1430
his	:			1275
I	:			1180
CPU times: user 6.32 ms, sys: 197 µs, total: 6.52 ms
Wall time: 178 ms


#### To sum up

- An RDD is a distributed immutable array and is the core data structure of Spark
- It is not possible to operate on RDD directly but through **Transformations** and **Actions**
- **Transformations** transform an RDD into another RDD
- **Actions** output their results on the head node
- After the action is done you are now using the head node and no longer on the workers node.
- RDD operations (**Transformations** and **Actions**) are added to what we call an **Execution Plan**
- The plan is executed when the result is needed
- It is possible to store intermediate result explicitly by using caching
- For scalability you need to use RDDs instead of working only on the head node (Ex: finding the most common words)

## Meteorological data

In [4]:
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
%pylab inline

sqlContext = SQLContext(sc)

%pylab is deprecated, use %matplotlib inline and import the required libraries.
Populating the interactive namespace from numpy and matplotlib




In [5]:
data_path = "../data/NY.parquet"

df = sqlContext.read.load(data_path)

df.printSchema()

root
 |-- Station: string (nullable = true)
 |-- Measurement: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Values: binary (nullable = true)
 |-- dist_coast: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- state: string (nullable = true)
 |-- name: string (nullable = true)


In [6]:
df.show(1)

+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|    Station|Measurement|Year|              Values|       dist_coast|      latitude|         longitude|        elevation|state|             name|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|USW00094704|   PRCP_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+


#### projection

In [8]:
df.select("dist_coast", 'Values', 'Year').show()

+-----------------+--------------------+----+
|       dist_coast|              Values|Year|
+-----------------+--------------------+----+
|361.8320007324219|[00 00 00 00 00 0...|1945|
|361.8320007324219|[99 46 52 46 0B 4...|1946|
|361.8320007324219|[79 4C 75 4C 8F 4...|1947|
|361.8320007324219|[72 48 7A 48 85 4...|1948|
|361.8320007324219|[BB 49 BC 49 BD 4...|1949|
|361.8320007324219|[6E 4B 93 4B BB 4...|1950|
|361.8320007324219|[27 4A 32 4A 28 4...|1951|
|361.8320007324219|[54 4B 60 4B 6A 4...|1952|
|361.8320007324219|[48 4A 37 4A 28 4...|1953|
|361.8320007324219|[DE 4A D4 4A CA 4...|2000|
|361.8320007324219|[D9 44 C7 44 B6 4...|2001|
|361.8320007324219|[CF 4B B8 4B A1 4...|2002|
|361.8320007324219|[18 4B F1 4A D2 4...|2003|
|361.8320007324219|[CE 4A 9C 4A 6B 4...|2004|
|361.8320007324219|[DD 4C D3 4C C9 4...|2005|
|361.8320007324219|[91 4B 9F 4B AC 4...|2006|
|361.8320007324219|[39 4E 36 4E 34 4...|2007|
|361.8320007324219|[3A 4A 11 4A EA 4...|2008|
|361.8320007324219|[5C 4A 3F 4A 26

#### filter

In [16]:
df.filter((df.Year==1945 ) & (df.name=="DANSVILLE MUNI AP")).show()

+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|    Station|Measurement|Year|              Values|       dist_coast|      latitude|         longitude|        elevation|state|             name|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|USW00094704|   PRCP_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   TMAX_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   SNOW_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   TMIN_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   

#### filter and projection

In [18]:
df.filter((df.Year==1945 ) & (df.name=="DANSVILLE MUNI AP")).select("Station", "latitude", "longitude", "name").show()

+-----------+--------------+------------------+-----------------+
|    Station|      latitude|         longitude|             name|
+-----------+--------------+------------------+-----------------+
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
|USW00094704|42.57080078125|-77.71330261230469|DANSVILLE MUNI AP|
+-----------+--------------+------------------+-----------------+


#### To sum up

- Dataframes are an efficient way to store data tables
- All of the values in a column have same type (it's like a table column in a DB)
- A good way to store a dataframe in disk is to use a Parquet File format