# Intro STREAMING

Se procede a construir una app de Structured Streaming que puede leer files de un folder local, conforme nuevos files son adicionados a dicho folder y luego aplicar operaciones a los nuevos datos.

Iniciamos con la creacion de la SparkSession y posteriormente la importacion de las librerias necesarias:


In [1]:
#Creacion SparkSession

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('structured_streaming').getOrCreate()

# Librerias 

import pyspark.sql.functions as F
from pyspark.sql.types import *

Ahora, vamos a crear algunos datos auto-generados que serán colocados en el directorio local "demo" y que serán leidos via Structured Streaming.

La data generada estará en formato CSV y consta de 4 columnas:

1. User ID
2. App
3. Time spent (secs)
4. Age

In [2]:
#creacion del  dataset como archivos csv (apppend)
df_1=spark.createDataFrame([("XN203",'FB',300,30),("XN201",'Twitter',10,19),("XN202",'Insta',500,45)], 
                           ["user_id", "app" ,"time_in_secs","age"]).write.csv("demo",mode='append')

Se define el SCHEMA para los archivos creados con el fin de leerlos correctamente usando stream processing.

In [3]:
#define schema para el input data
schema=StructType().add("user_id", "string").add("app", "string").add("time_in_secs", "integer").add("age", "integer")


Leemos el archivo como un stream dataframe, cabe destacar que el API que se usa para leer dataframes estaticos es similar al que se usa para dataframes streaming, la unica diferencia quizas radica en el uso de readStream:

In [4]:
data=spark.readStream.option("sep", ",").schema(schema).csv("demo")

Validamos los valores del SCHEMA:

In [5]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- app: string (nullable = true)
 |-- time_in_secs: integer (nullable = true)
 |-- age: integer (nullable = true)



## Operaciones

Ahora podemos aplicar multiples tranformaciones, por ejemplo, haremos a continuacion un conteo simple de las entradas en cada app del DataFrame:

In [6]:
app_count = data.groupBy('app').count()

Para efectos del resultado debemos indicar el "outputMode" (usaremos complete lo que indica que escribiremos el resultado en el DataFrame completo cada vez) y la localizacion de salida, en este caso, colcocaremos los resultados en memoria, sin embargo, puede ser colocado en consola, cloud storage o cualueir otra localidad. 

Para ver el resultado usamos el comando basico Spark SQL para verlo como DataFrame de Pandas.


In [7]:
query=(app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())

In [8]:
spark.sql("select * from count_query ").toPandas().head()

Unnamed: 0,app,count
0,Insta,1
1,FB,1
2,Twitter,1


Problemas con las versiones de **pysark**? **pandas**? Se recomienda actualizar las libs del ambiente de trabajo:

    conda update --all
    

Podemos hacer filtrado para ver solo records de por ejemplo, solo Facebook (FB) app. En caso de tener multiples entradas el tiempo indicado seria el promedio:

In [9]:
fb_data = data.filter(data['app']=='FB')
fb_avg_time = fb_data.groupBy('user_id').agg(F.avg("time_in_secs"))
fb_query = (fb_avg_time.writeStream.queryName('fb_query').outputMode('complete').format('memory').start())


In [10]:
spark.sql("select * from fb_query ").toPandas().head()

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,300.0


Ahora creamos mas entrada FB, para ver informacion relevante a multiples entradas:

In [11]:
df_2=spark.createDataFrame([("XN203",'FB',100,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],
                           ["user_id","app","time_in_secs","age"]).write.csv("demo",mode='append')

Se ha hecho append de los nuevos datos, notese como XN203 nos da un promedio de 200, que corresponde al promedio de 300 y 100:

In [13]:
spark.sql("select * from fb_query ").toPandas().head()

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,200.0
1,XN201,10.0
2,XN202,2000.0


Agregamos más datos...

In [14]:
df_3=spark.createDataFrame([("XN203",'FB',500,30), ("XN201",'Insta',30,19),("XN202",'Twitter',100,45)],
["user_id","app","time_in_secs","age"]).write.csv("demo",mode='append')

In [15]:
spark.sql("select * from fb_query ").toPandas().head()

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,300.0
1,XN201,10.0
2,XN202,2000.0


A continuación, aplicamos agregación (aggregation) y ordenamiento (sorting) al DataFrame existente en el folder local. Se agrupan las entradas por app y se calcula el tiempo total, en orden decreciente:

In [16]:
#app wise time spent

app_df=data.groupBy('app').agg(F.sum('time_in_secs').alias('total_time')).orderBy('total_time',ascending=False)

app_query=(app_df.writeStream.queryName('app_wise_query').outputMode('complete').format('memory').start())

In [18]:
spark.sql("select * from app_wise_query ").toPandas().head()

Unnamed: 0,app,total_time
0,FB,2910
1,Insta,530
2,Twitter,110


Ahora tenemos los resultados para cada app, el total de tiempo por todos los usuarios usando stream dataframe. Agregaremos nuevas entradas y veamos los resultados:

In [19]:
df_4=spark.createDataFrame([("XN203",'FB',500,30), ("XN201",'Insta',30,19),("XN202",'Twitter',100,45)],
                           ["user_id","app","time_in_secs","age"]).write.csv("demo",mode='append')

In [22]:
spark.sql("select * from app_wise_query ").toPandas().head()

Unnamed: 0,app,total_time
0,FB,3410
1,Insta,560
2,Twitter,210


Finalmente obtenemos el promedio de las edades de los usuarios para cada app y hacemos un sort en orden decreciente:

In [23]:
age_df = data.groupBy('app').agg(F.avg('age').alias('mean_age')).orderBy('mean_age', ascending=False)

In [24]:
age_query = (age_df.writeStream.queryName('age_query').outputMode('complete').format('memory').start())

In [25]:
df_5 = spark.createDataFrame([("XN210",'FB',500,50), ("XN255",'Insta',30,23),("XN222",'Twitter',100,30)], 
                           ["user_id","app","time_in_secs","age"]).write.csv("demo", mode='append')

In [26]:
spark.sql("select * from age_query ").toPandas().head(5)

Unnamed: 0,app,mean_age
0,Twitter,36.333333
1,FB,30.666667
2,Insta,27.666667


## JOINS

Algunas veces debemos hacer un merge de los datos entrantes con datos batch, veamos como hacer merge de datos entrantes (stream dataframe) con dataframes estaticos que contienen el full nombre de la app. 

In [28]:
app_df = spark.createDataFrame([('FB','FACEBOOK'),('Insta','INSTAGRAM'),('Twitter','TWITTER')],["app", "full_name"])


In [29]:
app_df.show()

+-------+---------+
|    app|full_name|
+-------+---------+
|     FB| FACEBOOK|
|  Insta|INSTAGRAM|
|Twitter|  TWITTER|
+-------+---------+



Ya con el dataframe estatico disponible, simplemente escribimos una nueva query para hacer join de los datos que venimos trabajando, con los datos estaticos recien generados.

In [30]:
app_stream_df=data.join(app_df,'app')

join_query=(app_stream_df.writeStream.queryName('join_query').outputMode('append').format('memory').start())

In [31]:
spark.sql("select * from join_query ").toPandas().head(50)

Unnamed: 0,app,user_id,time_in_secs,age,full_name
0,FB,XN201,10,19,FACEBOOK
1,FB,XN210,500,50,FACEBOOK
2,FB,XN203,500,30,FACEBOOK
3,FB,XN203,500,30,FACEBOOK
4,FB,XN203,100,30,FACEBOOK
5,FB,XN203,300,30,FACEBOOK
6,FB,XN202,2000,45,FACEBOOK
7,Insta,XN255,30,23,INSTAGRAM
8,Insta,XN201,30,19,INSTAGRAM
9,Insta,XN201,30,19,INSTAGRAM


Como se aprecia, tenemos ahora una columna adicional (full_name) en el dataframe streaming. 