In [1]:
import pyspark.sql.functions as f
from pyspark.sql.session import SparkSession , DataFrame


# Architecture
## Standalone
![image](jupyter-files/standalone.png)
## Kubernetes
![image](jupyter-files/kuber.png)
## Yarn

![image](jupyter-files/yarn.png)






# Make a SparkSession


In [3]:
spark = SparkSession.builder.appName("Presentation").getOrCreate()
spark

# Read Data

In [8]:
# Read and create dataframes from diffrent file formats
df_people = spark.read.json('data/people.json')
df_sales = spark.read.csv('data/sales_info.csv',inferSchema=True,header=True)



In [9]:
# print schema
df_people.printSchema()
df_sales.printSchema()


root
 |-- Extra: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

root
 |-- Company: string (nullable = true)
 |-- PersonName: string (nullable = true)
 |-- Sales: double (nullable = true)



In [10]:
# print schema
df_people.show()
df_sales.show()


+---------+---+-------+
|    Extra|age|   name|
+---------+---+-------+
|ExtraData| 25|Charlie|
|     null| 30|  Frank|
|     null| 26|   Tina|
|     null| 30|    Amy|
|     null| 28|   Carl|
|     null| 19|   John|
|     null| 17|Vanessa|
|     null| 29|  Sarah|
|     null| 23|  Linda|
|     null| 21|   Mike|
|     null| 22|  Chris|
|     null| 24|    Sam|
+---------+---+-------+

+-------+----------+-----+
|Company|PersonName|Sales|
+-------+----------+-----+
|   GOOG|       Sam|200.0|
|   GOOG|   Charlie|120.0|
|   GOOG|     Frank|340.0|
|   MSFT|      Tina|600.0|
|   MSFT|       Amy|124.0|
|   MSFT|   Vanessa|243.0|
|     FB|      Carl|870.0|
|     FB|     Sarah|350.0|
|   APPL|      John|250.0|
|   APPL|     Linda|130.0|
|   APPL|      Mike|750.0|
|   APPL|     Chris|350.0|
+-------+----------+-----+



# Basics (Spark SQL)

In [11]:
df_people

DataFrame[Extra: string, age: bigint, name: string]

In [12]:
df_people['age']

Column<'age'>

In [13]:
# Check UI | what is a job? 
# df_people.select('name')
df_people.select('name').show()

+-------+
|   name|
+-------+
|Charlie|
|  Frank|
|   Tina|
|    Amy|
|   Carl|
|   John|
|Vanessa|
|  Sarah|
|  Linda|
|   Mike|
|  Chris|
|    Sam|
+-------+



In [14]:
# DataFrames, like RDDs, are immutable
df_people.withColumn("new_col", f.col("age")).show()

+---------+---+-------+-------+
|    Extra|age|   name|new_col|
+---------+---+-------+-------+
|ExtraData| 25|Charlie|     25|
|     null| 30|  Frank|     30|
|     null| 26|   Tina|     26|
|     null| 30|    Amy|     30|
|     null| 28|   Carl|     28|
|     null| 19|   John|     19|
|     null| 17|Vanessa|     17|
|     null| 29|  Sarah|     29|
|     null| 23|  Linda|     23|
|     null| 21|   Mike|     21|
|     null| 22|  Chris|     22|
|     null| 24|    Sam|     24|
+---------+---+-------+-------+



In [15]:
# Spark is lazy
new_people_df = df_people.withColumn("new_col", f.col("age"))

people_with_sales = new_people_df.join(df_sales, new_people_df.name==df_sales.PersonName)
people_with_sales.show()


+---------+---+-------+-------+-------+----------+-----+
|    Extra|age|   name|new_col|Company|PersonName|Sales|
+---------+---+-------+-------+-------+----------+-----+
|ExtraData| 25|Charlie|     25|   GOOG|   Charlie|120.0|
|     null| 30|  Frank|     30|   GOOG|     Frank|340.0|
|     null| 26|   Tina|     26|   MSFT|      Tina|600.0|
|     null| 30|    Amy|     30|   MSFT|       Amy|124.0|
|     null| 28|   Carl|     28|     FB|      Carl|870.0|
|     null| 19|   John|     19|   APPL|      John|250.0|
|     null| 17|Vanessa|     17|   MSFT|   Vanessa|243.0|
|     null| 29|  Sarah|     29|     FB|     Sarah|350.0|
|     null| 23|  Linda|     23|   APPL|     Linda|130.0|
|     null| 21|   Mike|     21|   APPL|      Mike|750.0|
|     null| 24|    Sam|     24|   GOOG|       Sam|200.0|
+---------+---+-------+-------+-------+----------+-----+



In [16]:
people_with_sales.drop(f.col("PersonName")).show()


+---------+---+-------+-------+-------+-----+
|    Extra|age|   name|new_col|Company|Sales|
+---------+---+-------+-------+-------+-----+
|ExtraData| 25|Charlie|     25|   GOOG|120.0|
|     null| 30|  Frank|     30|   GOOG|340.0|
|     null| 26|   Tina|     26|   MSFT|600.0|
|     null| 30|    Amy|     30|   MSFT|124.0|
|     null| 28|   Carl|     28|     FB|870.0|
|     null| 19|   John|     19|   APPL|250.0|
|     null| 17|Vanessa|     17|   MSFT|243.0|
|     null| 29|  Sarah|     29|     FB|350.0|
|     null| 23|  Linda|     23|   APPL|130.0|
|     null| 21|   Mike|     21|   APPL|750.0|
|     null| 24|    Sam|     24|   GOOG|200.0|
+---------+---+-------+-------+-------+-----+



In [17]:
# GroupBy
people_with_sales.groupBy("Company").agg(f.sum("Sales")).show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1130.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [18]:
# use sql syntax
people_with_sales.createOrReplaceTempView("temp_table")
spark.sql("select * from temp_table").show()

+---------+---+-------+-------+-------+----------+-----+
|    Extra|age|   name|new_col|Company|PersonName|Sales|
+---------+---+-------+-------+-------+----------+-----+
|ExtraData| 25|Charlie|     25|   GOOG|   Charlie|120.0|
|     null| 30|  Frank|     30|   GOOG|     Frank|340.0|
|     null| 26|   Tina|     26|   MSFT|      Tina|600.0|
|     null| 30|    Amy|     30|   MSFT|       Amy|124.0|
|     null| 28|   Carl|     28|     FB|      Carl|870.0|
|     null| 19|   John|     19|   APPL|      John|250.0|
|     null| 17|Vanessa|     17|   MSFT|   Vanessa|243.0|
|     null| 29|  Sarah|     29|     FB|     Sarah|350.0|
|     null| 23|  Linda|     23|   APPL|     Linda|130.0|
|     null| 21|   Mike|     21|   APPL|      Mike|750.0|
|     null| 24|    Sam|     24|   GOOG|       Sam|200.0|
+---------+---+-------+-------+-------+----------+-----+



# Spark Mlib

In [7]:
from pyspark.ml.clustering import KMeans

# Loads data.
dataset = spark.read.format("libsvm").load("data/sample_kmeans_data.txt")

dataset.printSchema()
dataset.show(1)
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)


# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

23/05/19 20:58:32 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.


root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

+-----+---------+
|label| features|
+-----+---------+
|  0.0|(3,[],[])|
+-----+---------+
only showing top 1 row

Cluster Centers: 
[9.1 9.1 9.1]
[0.1 0.1 0.1]


# Datawarehousing


In [23]:
people_with_sales.write.partitionBy("Company").mode("overwrite").json("output/tables/people_with_sales")

# Spark Streaming
![image](jupyter-files/streaming2.png)


### WordCount | SparkStreaming with windowing

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second

ssc = StreamingContext(spark.sparkContext, 3)
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.window(6).pprint()

print(type(lines))


ssc.start()
ssc.awaitTermination()


## SparkStructuredStreaming

In [None]:
# Create DataFrame representing the stream of input lines from connection to localhost:9999
# `nc -lk 9999`
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("checkpointLocation","checkpoints") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   f.explode(
       f.split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "checkpoint")\
    .format("console")
query.start()
