#Streaming estructurado con Python

In [2]:
from pyspark.sql.functions import rand
from pyspark.sql.functions import expr

spark.conf.set("spark.sql.shuffle.partitions", "1")

In [3]:
impressions = (
  spark
    .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()
    .selectExpr("value AS adId", "timestamp AS impressionTime")
)



In [4]:
clicks = (
  spark
  .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()
  .where((rand() * 100).cast("integer") < 10)     
  .selectExpr("(value - 50) AS adId ", "timestamp AS clickTime")  
  .where("adId > 0")
)    

In [5]:
#Despliega los datos desde impressions
display(impressions)

adId,impressionTime
0,2020-09-07T03:46:42.320+0000
1,2020-09-07T03:46:42.520+0000
2,2020-09-07T03:46:42.720+0000
3,2020-09-07T03:46:42.920+0000
4,2020-09-07T03:46:43.120+0000
5,2020-09-07T03:46:43.320+0000
6,2020-09-07T03:46:43.520+0000
7,2020-09-07T03:46:43.720+0000
8,2020-09-07T03:46:43.920+0000
9,2020-09-07T03:46:44.120+0000


In [6]:
#Despliega los datos desde clicks
display(clicks)

adId,clickTime
16,2020-09-07T03:47:13.392+0000
17,2020-09-07T03:47:13.592+0000
20,2020-09-07T03:47:14.192+0000
24,2020-09-07T03:47:14.992+0000
33,2020-09-07T03:47:16.792+0000
41,2020-09-07T03:47:18.392+0000
63,2020-09-07T03:47:22.792+0000
82,2020-09-07T03:47:26.592+0000
99,2020-09-07T03:47:29.992+0000
124,2020-09-07T03:47:34.992+0000


In [7]:
#Haz un join entre ambos dataframes por medio de la columna adId y despliega la información
display(impressions.join(clicks, 'adId'))

adId,impressionTime,clickTime
9,2020-09-07T03:48:38.261+0000,2020-09-07T03:48:49.005+0000
21,2020-09-07T03:48:40.661+0000,2020-09-07T03:48:51.405+0000
23,2020-09-07T03:48:41.061+0000,2020-09-07T03:48:51.805+0000
26,2020-09-07T03:48:41.661+0000,2020-09-07T03:48:52.405+0000
34,2020-09-07T03:48:43.261+0000,2020-09-07T03:48:54.005+0000
37,2020-09-07T03:48:43.861+0000,2020-09-07T03:48:54.605+0000
64,2020-09-07T03:48:49.261+0000,2020-09-07T03:49:00.005+0000
68,2020-09-07T03:48:50.061+0000,2020-09-07T03:49:00.805+0000
77,2020-09-07T03:48:51.861+0000,2020-09-07T03:49:02.605+0000
97,2020-09-07T03:48:55.861+0000,2020-09-07T03:49:06.605+0000


### Inner Join con Watermarking

In [9]:


# Define watermarks
impressionsWithWatermark = impressions \
  .selectExpr("adId AS impressionAdId", "impressionTime") \
  .withWatermark("impressionTime", "10 seconds ")
clicksWithWatermark = clicks \
  .selectExpr("adId AS clickAdId", "clickTime") \
  .withWatermark("clickTime", "20 seconds")        # max 20 seconds late




In [10]:

# Inner join con time range conditions
display(
  impressionsWithWatermark.join(
    clicksWithWatermark,
    expr(""" 
      clickAdId = impressionAdId AND 
      clickTime >= impressionTime AND 
      clickTime <= impressionTime + interval 1 minutes    
      """
    )
  )
)

impressionAdId,impressionTime,clickAdId,clickTime
21,2020-09-07T03:49:26.698+0000,21,2020-09-07T03:49:37.277+0000
23,2020-09-07T03:49:27.098+0000,23,2020-09-07T03:49:37.677+0000
27,2020-09-07T03:49:27.898+0000,27,2020-09-07T03:49:38.477+0000
41,2020-09-07T03:49:30.698+0000,41,2020-09-07T03:49:41.277+0000
42,2020-09-07T03:49:30.898+0000,42,2020-09-07T03:49:41.477+0000
46,2020-09-07T03:49:31.698+0000,46,2020-09-07T03:49:42.277+0000
55,2020-09-07T03:49:33.498+0000,55,2020-09-07T03:49:44.077+0000
59,2020-09-07T03:49:34.298+0000,59,2020-09-07T03:49:44.877+0000
62,2020-09-07T03:49:34.898+0000,62,2020-09-07T03:49:45.477+0000
63,2020-09-07T03:49:35.098+0000,63,2020-09-07T03:49:45.677+0000


In [11]:
from pyspark.sql import SparkSession
#Finaliza los streaming con streams.awaitAnyTermination()
spark.streams.awaitAnyTermination()