In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("spark://172.25.0.101:7077") \
    .appName("DanAppStreaming") \
    .getOrCreate()

In [173]:
! cd Datasets/ && ls

Canada.xlsx		      P4-Section6-Homework-Dataset.csv
inputSample.txt		      yelp_academic_dataset_business.json
IsolationFull.txt	      yelp_academic_dataset_checkin.json
MIG_ITALY.csv		      yelp_academic_dataset_review.json
MIG_ITALY_NO_QUOTE.csv	      yelp_academic_dataset_tip.json
Mig_Italy_Streaming.csv       yelp_academic_dataset_user.json
Mig_Italy_Streaming_data.csv


## Reading Flatfile

In [93]:
staticMigItaly = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/home/jovyan/spark/Datasets/Mig_Italy_Streaming.csv")

In [94]:
staticMigItaly.printSchema()

root
 |-- CO2: string (nullable = true)
 |-- Country of birth/nationality: string (nullable = true)
 |-- VAR: string (nullable = true)
 |-- Variable: string (nullable = true)
 |-- GEN: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- COU: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- YEA: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Value: integer (nullable = true)
 |-- Flag Codes: string (nullable = true)
 |-- Flags: string (nullable = true)



In [95]:
staticMigItaly.createOrReplaceTempView("Mig_Italy_Streaming")
staticItalySchema = staticMigItaly.schema

In [96]:
staticItalySchema.names

['CO2',
 'Country of birth/nationality',
 'VAR',
 'Variable',
 'GEN',
 'Gender',
 'COU',
 'Country',
 'YEA',
 'Year',
 'Value',
 'Flag Codes',
 'Flags']

## add a total cost column

In [97]:
staticMigItaly.na.fill(0)

DataFrame[CO2: string, Country of birth/nationality: string, VAR: string, Variable: string, GEN: string, Gender: string, COU: string, Country: string, YEA: int, Year: int, Value: int, Flag Codes: string, Flags: string]

In [98]:
from pyspark.sql.functions import window, column, desc, col
staticMigItaly\
.groupBy(col("Country"))\
.sum("Value")\
.orderBy("sum(Value)", ascending = False)\
.select(col("Country").alias("Country"),col("sum(Value)").alias("QtdPeople"))\
.show(5)

+-------------+---------+
|      Country|QtdPeople|
+-------------+---------+
|      Germany| 18810323|
|United States|  9192897|
|  Switzerland|  9038454|
|       France|  6271734|
|      Belgium|  5905870|
+-------------+---------+
only showing top 5 rows



## shuffle partitions to 5

In [99]:
# default value is 200
spark.conf.set("spark.sql.shuffle.partitions", "5")

## streaming code readStream

In [239]:
streamingMigItaly = spark.readStream\
.schema(staticItalySchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("/home/jovyan/spark/Datasets/Streaming/*.csv")

In [240]:
print(type(streamingMigItaly))

<class 'pyspark.sql.dataframe.DataFrame'>


In [81]:
##streamingMigItaly

## Checking the streaming

In [241]:
streamingMigItaly.isStreaming # returns true

True

## Sum streaming Dataframe group by "window"

In [242]:
sumMigraPerCountry = streamingMigItaly\
.groupBy(col("Country"))\
.sum("Value")\
.select(col("Country").alias("Country"),col("sum(Value)").alias("QtdPeople"))

In [243]:
sumMigraPerCountry.printSchema()

root
 |-- Country: string (nullable = true)
 |-- QtdPeople: long (nullable = true)



## Call a streaming action to start the execution of this data flow

In [344]:
activityQuery = (
    sumMigraPerCountry.writeStream\
    .format("memory")\
    .queryName("countries_people")\
    .outputMode("complete")\
    .start()
)
#sumMigraPerCountry.awaitTermination()

In [329]:
print(type(activityQuery),type(sumMigraPerCountry))

<class 'pyspark.sql.streaming.StreamingQuery'> <class 'pyspark.sql.dataframe.DataFrame'>


In [408]:
activityQuery.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [342]:
activityQuery.stop()

In [341]:
spark.streams.active[0].isActive

True

## run queries

In [418]:
spark.sql("""
SELECT *
FROM countries_people
where `Country` = 'Turkey'  
ORDER BY `QtdPeople` DESC
""")\
.show(80)

+-------+---------+
|Country|QtdPeople|
+-------+---------+
| Turkey|    88398|
+-------+---------+



In [174]:
sumMigraPerCountryConsole.writeStream\
.format("console")\
.queryName("countries_people_console")\
.outputMode("complete")\
.start()

NameError: name 'sumMigraPerCountryConsole' is not defined

In [152]:
spark.sql("""
SELECT *
FROM countries_people_console
ORDER BY `QtdPeople` DESC
""")\
.show(5)

AnalysisException: Table or view not found: countries_people_console; line 3 pos 5;
'Sort ['QtdPeople DESC NULLS LAST], true
+- 'Project [*]
   +- 'UnresolvedRelation [countries_people_console]
