### Ingest and process real-time data streams with Azure Synapse Analytics

Adopted from https://www.mssqltips.com/sqlservertip/6748/real-time-data-streams-azure-synapse-analytics/

In [1]:
from pyspark.sql.functions import *
from datetime import datetime
from dateutil import parser

# from azureml.opendatasets import NycTlcGreen
# data = NycTlcGreen()

from azureml.opendatasets import NycTlcYellow

end_date = parser.parse('2018-06-06')
start_date = parser.parse('2018-05-01')
data = NycTlcYellow(start_date=start_date, end_date=end_date)
 
data_df = data.to_spark_dataframe()
data_df.printSchema()

StatementMeta(jbjsynspark, 4, 1, Finished, Available)

root
 |-- vendorID: string (nullable = true)
 |-- tpepPickupDateTime: timestamp (nullable = true)
 |-- tpepDropoffDateTime: timestamp (nullable = true)
 |-- passengerCount: integer (nullable = true)
 |-- tripDistance: double (nullable = true)
 |-- puLocationId: string (nullable = true)
 |-- doLocationId: string (nullable = true)
 |-- startLon: double (nullable = true)
 |-- startLat: double (nullable = true)
 |-- endLon: double (nullable = true)
 |-- endLat: double (nullable = true)
 |-- rateCodeId: integer (nullable = true)
 |-- storeAndFwdFlag: string (nullable = true)
 |-- paymentType: string (nullable = true)
 |-- fareAmount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mtaTax: double (nullable = true)
 |-- improvementSurcharge: string (nullable = true)
 |-- tipAmount: double (nullable = true)
 |-- tollsAmount: double (nullable = true)
 |-- totalAmount: double (nullable = true)
 |-- puYear: integer (nullable = true)
 |-- puMonth: integer (nullable = true)

In [2]:
deltaPath='/synapse/Streaming/NYTaxiRaw'

# Comment out for now since already ran
# data_df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(deltaPath)

StatementMeta(jbjsynspark, 4, 2, Finished, Available)



In [3]:
deltaProcessedPath='/synapse/Streaming/NYTaxiSilver'
deltaCheckpointPath='/synapse/Streaming/NYTaxiCheckpointSilver'
 
dfProcStream=(spark.readStream.format('delta').option('ignoreChanges', 'true').load(deltaPath)
.withColumn('TripDuration_Min',round((col('tpepDropoffDateTime').cast('long')-col('tpepPickupDateTime').cast('long'))/60))
.selectExpr('tpepPickupDateTime as PickupTime','tpepDropoffDateTime as DropoffTime','TripDuration_Min','fareAmount','tipAmount')
.writeStream.format('delta')
.option('mode','overwrite')
.option('checkpointLocation',deltaCheckpointPath)
.start(deltaProcessedPath))

StatementMeta(jbjsynspark, 4, 3, Finished, Available)



In [4]:
print(dfProcStream.isActive)

StatementMeta(jbjsynspark, 4, 4, Finished, Available)

True

In [5]:
print(dfProcStream.status)

StatementMeta(jbjsynspark, 4, 5, Finished, Available)

{'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}

In [6]:
%%sql
CREATE TABLE NYTaxi 
USING DELTA
LOCATION '/synapse/Streaming/NYTaxiSilver'

StatementMeta(jbjsynspark, 4, 6, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [7]:
%%sql
select * from NYTaxi limit 100

StatementMeta(jbjsynspark, 4, 7, Finished, Available)

<Spark SQL result set with 100 rows and 5 fields>

In [10]:
# Try to append data

from azureml.opendatasets import NycTlcYellow

end_date = parser.parse('2018-07-31')
start_date = parser.parse('2018-07-01')
data = NycTlcYellow(start_date=start_date, end_date=end_date)
 
data_df = data.to_spark_dataframe()

deltaPath='/synapse/Streaming/NYTaxiRaw'

# Comment out for now since already ran
# data_df.write.format('delta').mode('append').save(deltaPath)

StatementMeta(jbjsynspark, 4, 10, Finished, Available)



In [12]:
print(dfProcStream.status)

StatementMeta(, , , Waiting, )

{'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}

In [13]:
%%sql
select * from NYTaxi 
where PickupTime > '2018-06-07T00:00:00Z' 
order by PickupTime desc limit 100

StatementMeta(jbjsynspark, 4, 13, Finished, Available)

<Spark SQL result set with 100 rows and 5 fields>

In [15]:
%%sql
select count(1) from NYTaxi 
where PickupTime > '2018-07-01T00:00:00Z' 

StatementMeta(jbjsynspark, 4, 15, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

In [16]:
dfProcStream.stop()

print(dfProcStream.status)

StatementMeta(jbjsynspark, 4, 16, Finished, Available)

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [26]:
%%sql
DROP TABLE NYTaxi 

StatementMeta(jbjsynspark, 1, 26, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [17]:
print(dfProcStream.status)

StatementMeta(jbjsynspark, 4, 17, Finished, Available)

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}