# Workshop 2
Learning pyspark
<br>
Getting familliar with spark's functions

---
# Installation
1. install docker
2. docker pull jupyter/all-spark-notebook
3. docker run -d --name notebook -p 10000:8888 -e JUPYTER_ENABLE_LAB=yes -v ~/Development/DockerWorkspace:/home/jovyan/work jupyter/all-spark-notebook

---
# Importing pyspark

In [3]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession(sc)
# run this cell only once

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-1-3bd1e62be2c5>:4 

---
# Checking version

In [4]:
sc.version

'2.4.5'

---
# Spark's RDD

In [38]:
a = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
rdd1 = sc.parallelize(a)
print("\n", rdd1)


 ParallelCollectionRDD[162] at parallelize at PythonRDD.scala:195


---
# first(), collect(), count(), take(), max()

In [41]:
print("\n", rdd1.first())
print("\n", rdd1.collect())
print("\n", rdd1.count())
print("\n", rdd1.take(2))
print("\n", rdd1.max())


 0

 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

 10

 [0, 1]

 9


---
# reduce()

In [11]:
def r(x, y):
    return y

print("\n", rdd1.reduce(r))


 9


---
# map(), filter()

In [12]:
rdd2 = rdd1.map(lambda x: x * 2 - 10)
print("\n", rdd2.collect())

rdd3 = rdd1.filter(lambda x: x % 2 == 0)
print("\n", rdd3.collect())


 [-10, -8, -6, -4, -2, 0, 2, 4, 6, 8]

 [0, 2, 4, 6, 8]


---
# flatMap()

In [13]:
rdd1 = sc.parallelize([(1, [0, 1, 2, 3]), (4, [6, 2, 1, 4, 3, 6]), (2, [0, 3])])
rdd1 = rdd1.flatMap(lambda x: x[1][:3])
print("\n", rdd1.collect())


 [0, 1, 2, 6, 2, 1, 0, 3]


---
# union(), intersection(), distinct()

In [15]:
rdd1 = sc.parallelize([0, 1, 2, 3, 4, 5, 5, 6, 7, 8, 9])
rdd2 = sc.parallelize([4, 4, 5, 5, 13, 13, 14, 14])

rdd3 = rdd1.union(rdd2)
print("\n", rdd3.collect())

rdd4 = rdd1.intersection(rdd2)
print("\n", rdd4.collect())

print("\n", rdd2.distinct().collect())


 [0, 1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 4, 4, 5, 5, 13, 13, 14, 14]

 [4, 5]

 [13, 14, 4, 5]


---
# sortByKey(), reduceByKey(), groupByKey()

In [17]:
rdd1 = sc.parallelize([(2, "Pink Orange"), (1, "Green Apple"), (4, "Yellow Banana"), (2, "Red Orange"), (2, "Yellow Orange")])
rdd2 = rdd1.sortByKey(ascending=True)
print("\n", rdd2.collect())

rdd3 = rdd1.reduceByKey(lambda v1, v2: v1 + " " + v2)
print("\n", rdd3.collect())

rdd4 = rdd1.groupByKey()
print("\n", rdd4.collect())

print("\n", rdd4.map(lambda x: (x[0], list(x[1]))).collect())


 [(1, 'Green Apple'), (2, 'Pink Orange'), (2, 'Red Orange'), (2, 'Yellow Orange'), (4, 'Yellow Banana')]

 [(1, 'Green Apple'), (2, 'Pink Orange Red Orange Yellow Orange'), (4, 'Yellow Banana')]

 [(1, <pyspark.resultiterable.ResultIterable object at 0x7f1d3514d350>), (2, <pyspark.resultiterable.ResultIterable object at 0x7f1d3514d2d0>), (4, <pyspark.resultiterable.ResultIterable object at 0x7f1d3514d450>)]

 [(1, ['Green Apple']), (2, ['Pink Orange', 'Red Orange', 'Yellow Orange']), (4, ['Yellow Banana'])]


---
# Multiple functions, then collect()

In [21]:
rdd1 = sc.parallelize(list(range(100)))
rdd1 = rdd1.map(lambda x: x * 2 - 10).filter(lambda x: x % 3).distinct()
print("\n", rdd1.count())
print("\n", rdd1.collect())


 100

 [-6, 0, 6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144, 150, 156, 162, 168, 174, 180, 186, -10, -4, 2, 8, 14, 20, 26, 32, 38, 44, 50, 56, 62, 68, 74, 80, 86, 92, 98, 104, 110, 116, 122, 128, 134, 140, 146, 152, 158, 164, 170, 176, 182, 188, -8, -2, 4, 10, 16, 22, 28, 34, 40, 46, 52, 58, 64, 70, 76, 82, 88, 94, 100, 106, 112, 118, 124, 130, 136, 142, 148, 154, 160, 166, 172, 178, 184]


---
# Reading from file

In [22]:
rdd1 = sc.textFile("text_test.txt")
print("\n", rdd1.collect())


 ['hello', 'second line', 'last line', 'bye']


In [27]:
print("\n", rdd1.map(lambda x: x[:2]).flatMap(lambda x: x).reduce(lambda x, y: x + '.' + y))


 h.e.s.e.l.a.b.y


---
# Reading from dataframe

In [28]:
columns = ["firstname", "middlename", "lastname", "year", "gender", "salary"]

data = [
    ('James', '', 'Smith', '1991-04-01', 'M', 3000),
    ('Michael', 'Rose', '', '2000-05-19', 'M', 4000),
    ('Robert', '', 'Williams', '1978-09-05', 'M', 4000),
    ('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000),
    ('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1),
]

df = spark.createDataFrame(data=data, schema=columns)
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|      year|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



---
# Reading from csv

In [29]:
df = spark.read.option("header", True).csv("cities.csv")
df.show()

+-----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
| LatD| "LatM"| "LatS"| "NS"| "LonD"| "LonM"| "LonS"| "EW"|            "City"| "State"|
+-----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
|   41|      5|     59|  "N"|     80|     39|      0|  "W"|      "Youngstown"|      OH|
|   42|     52|     48|  "N"|     97|     23|     23|  "W"|         "Yankton"|      SD|
|   46|     35|     59|  "N"|    120|     30|     36|  "W"|          "Yakima"|      WA|
|   42|     16|     12|  "N"|     71|     48|      0|  "W"|       "Worcester"|      MA|
|   43|     37|     48|  "N"|     89|     46|     11|  "W"| "Wisconsin Dells"|      WI|
|   36|      5|     59|  "N"|     80|     15|      0|  "W"|   "Winston-Salem"|      NC|
|   49|     52|     48|  "N"|     97|      9|      0|  "W"|        "Winnipeg"|      MB|
|   39|     11|     23|  "N"|     78|      9|     36|  "W"|      "Winchester"|      VA|
|   34|     14|     24|  "N"|   

---
# Running SQL queries

In [31]:
df.createOrReplaceTempView("CITY_DATA")
df2 = spark.sql('SELECT * from CITY_DATA')
df2.show()

+-----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
| LatD| "LatM"| "LatS"| "NS"| "LonD"| "LonM"| "LonS"| "EW"|            "City"| "State"|
+-----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
|   41|      5|     59|  "N"|     80|     39|      0|  "W"|      "Youngstown"|      OH|
|   42|     52|     48|  "N"|     97|     23|     23|  "W"|         "Yankton"|      SD|
|   46|     35|     59|  "N"|    120|     30|     36|  "W"|          "Yakima"|      WA|
|   42|     16|     12|  "N"|     71|     48|      0|  "W"|       "Worcester"|      MA|
|   43|     37|     48|  "N"|     89|     46|     11|  "W"| "Wisconsin Dells"|      WI|
|   36|      5|     59|  "N"|     80|     15|      0|  "W"|   "Winston-Salem"|      NC|
|   49|     52|     48|  "N"|     97|      9|      0|  "W"|        "Winnipeg"|      MB|
|   39|     11|     23|  "N"|     78|      9|     36|  "W"|      "Winchester"|      VA|
|   34|     14|     24|  "N"|   