## Load Data Lakes Tables - Batch and Streaming
For the streaming porting, must have data streaming to the topic "demo-message-1".

*Note: If not working, try changing the GROUP_ID and Consumer Group values to reset

### Shared imports and variables
Run this first since most cells below need at least one of these imports or variables

In [3]:
from pyspark.sql.functions import col, desc, regexp_replace, substring, to_date, from_json, explode, expr
from pyspark.sql.types import StructType, StringType

taxi_zone_path = "/mnt/adlsdemo/nyctaxi/lookups/taxi_zone"
taxi_rate_path = "/mnt/adlsdemo/nyctaxi/lookups/taxi_rate_code"
yellow_delta_path = "/mnt/adlsdemo/nyctaxi/tripdata/yellow_delta"

date_format = "yyyy-MM-dd HH:mm:ss"

# Define a schema that Spark understands. This is one of several ways to do it.
trip_schema = (
  StructType()
    .add('VendorID', 'integer')
    .add('tpep_pickup_datetime', 'string')
    .add('tpep_dropoff_datetime', 'string')
    .add('passenger_count', 'integer')
    .add('trip_distance', 'double')
    .add('RatecodeID', 'integer')
    .add('store_and_fwd_flag', 'string')
    .add('PULocationID', 'integer')
    .add('DOLocationID', 'integer')
    .add('payment_type', 'integer')
    .add('fare_amount', 'double')
    .add('extra', 'double')
    .add('mta_tax', 'double')
    .add('tip_amount', 'double')
    .add('tolls_amount', 'double')
    .add('improvement_surcharge', 'double')
    .add('total_amount', 'double')
)


### Simple load of lookup data
Read data from shared databricks folder and save in delta format within Azure Data Lake Storage (ADLS).

In [5]:
input_df = (
  spark.read
    .option("header","true")
    .option("inferSchema", "true")
    .csv("/databricks-datasets/nyctaxi/taxizone/taxi_zone_lookup.csv") 
  )

df = input_df.withColumnRenamed("service_zone", "ServiceZone")

df.write.format("delta").mode("overwrite").save(taxi_zone_path)

display(df)

LocationID,Borough,Zone,ServiceZone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


### Batch Load of historical data
Read historical csv data from shared databricks folder and save in delta format within Azure Data Lake Storage (ADLS).

In [7]:
# If you want to delete the trips table before starting, keep following line uncommented
# dbutils.fs.rm(yellow_delta_path,recurse=True)

input_df = (
  spark.read
    .option("header","true")
    .schema(trip_schema)
    .csv("dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-*")
)

# Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed.
# Option A
transformed_df = (
  input_df
    .withColumn("year_month", regexp_replace(substring("tpep_pickup_datetime",1,7), '-', '_'))
    .withColumn("pickup_dt", to_date("tpep_pickup_datetime", date_format)) 
    .withColumn("dropoff_dt", to_date("tpep_dropoff_datetime", date_format))
    .withColumn("tip_pct", col("tip_amount") / col("total_amount"))
)
  
# Option B
transformed_df = input_df.selectExpr(
                  "*",
                  "replace(left(tpep_pickup_datetime, 7),'-','_') as year_month",
                  f"to_date(tpep_pickup_datetime, '{date_format}') as pickup_dt",
                  f"to_date(tpep_dropoff_datetime, '{date_format}') as dropoff_dt",
                  f"tip_amount/total_amount as tip_pct")

zone_df = spark.read.format("delta").load(taxi_zone_path)

# Join to bring in Taxi Zone data
trip_df = (
   transformed_df
     .join(zone_df, transformed_df.PULocationID == zone_df.LocationID, how="left").drop("LocationID")
     .withColumnRenamed("Burough", "PickupBurrough")
     .withColumnRenamed("Zone", "PickupZone")
     .withColumnRenamed("ServiceZone", "PickupServiceZone")
)

trip_df.write.mode("overwrite").partitionBy("year_month").format("delta").save(yellow_delta_path)


### Stream Load of incoming data - Trips for 2019-12
Read streaming data from Event Hubs (using Apache Kafka API) and save in the same delta location within Azure Data Lake Storage (ADLS).

In [9]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

run_version = "v25"

topic = 'demo-message-1'

# To setup Key Vault backed secret scope for this first time, replace items in url and follow instructions: 
#   https://<databricks-instance>/#secrets/createScopeSetup

# Password is really a Event Hub connection string, for example -> Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=ReadWriteTmp;SharedAccessKey=vhNXxXXXXXxxxXXXXXXXxx=;EntityPath=demo-message-1
password = dbutils.secrets.get(scope = "demo", key = "eh-sasl-{0}".format(topic))

EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{0}";'.format(password)
GROUP_ID = f'tst-group-{run_version}'

consumer_config = {
    'kafka.bootstrap.servers': 'dustin-demo-eh.servicebus.windows.net:9093',
    'kafka.security.protocol': 'SASL_SSL',
    'kafka.sasl.mechanism': 'PLAIN',
    'kafka.group.id': GROUP_ID,
    'kafka.request.timeout.ms': "60000",
    'kafka.session.timeout.ms': "20000",
    'kafka.heartbeat.interval.ms': "10000",
    'kafka.sasl.jaas.config': EH_SASL,
    'subscribe': topic
}

# Read from Kafka, format will be a kafka record
input_df = spark.readStream.format("kafka").options(**consumer_config).load()

# Cast just the value as a string (instead of bytes) then use from_json to convert to an object matching the schema
json_df = (
  input_df.select(
    from_json(col('value').cast('string'), trip_schema).alias("json")
  )
)

# Select all attribues from json as individual columns, cast trip_distance, add columns
transformed_df = (
    json_df
      .select("json.*")
      .withColumn("year_month", regexp_replace(substring("tpep_pickup_datetime",1,7), '-', '_'))
      .withColumn("pickup_dt", to_date("tpep_pickup_datetime", date_format)) 
      .withColumn("dropoff_dt", to_date("tpep_dropoff_datetime", date_format))
      .withColumn("tip_pct", col("tip_amount") / col("total_amount"))
)

# Join in lookup data
zone_df = spark.read.format("delta").load(taxi_zone_path)
trip_df = (
   transformed_df
     .join(zone_df, transformed_df["PULocationId"] == zone_df["LocationID"], how="left").drop("LocationID")
     .withColumnRenamed("Burough", "PickupBurrough")
     .withColumnRenamed("Zone", "PickupZone")
     .withColumnRenamed("ServiceZone", "PickupServiceZone")
)

display(trip_df)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,year_month,pickup_dt,dropoff_dt,tip_pct,Borough,PickupZone,PickupServiceZone
1,2019-12-01 00:09:18,2019-12-01 00:35:34,2,5.4,1,N,230,7,1,21.5,3.0,0.5,1.0,0.0,0.3,26.3,2019_12,2019-12-01,2019-12-01,0.0380228136882129,Manhattan,Times Sq/Theatre District,Yellow Zone
2,2019-12-01 00:55:19,2019-12-01 01:12:34,1,8.33,1,N,138,151,2,24.5,0.5,0.5,0.0,6.12,0.3,31.92,2019_12,2019-12-01,2019-12-01,0.0,Queens,LaGuardia Airport,Airports
1,2019-12-01 00:04:03,2019-12-01 00:14:45,1,1.3,1,N,125,148,2,8.5,3.0,0.5,0.0,0.0,0.3,12.3,2019_12,2019-12-01,2019-12-01,0.0,Manhattan,Hudson Sq,Yellow Zone
1,2019-12-01 00:23:31,2019-12-01 00:35:29,1,3.2,1,N,79,141,1,11.5,3.0,0.5,2.0,0.0,0.3,17.3,2019_12,2019-12-01,2019-12-01,0.1156069364161849,Manhattan,East Village,Yellow Zone
1,2019-12-01 00:47:03,2019-12-01 00:52:58,1,1.4,1,N,237,163,1,6.5,3.0,0.5,2.05,0.0,0.3,12.35,2019_12,2019-12-01,2019-12-01,0.165991902834008,Manhattan,Upper East Side South,Yellow Zone
1,2019-12-01 00:12:04,2019-12-01 00:15:19,2,0.3,1,N,236,237,1,4.0,3.0,0.5,1.5,0.0,0.3,9.3,2019_12,2019-12-01,2019-12-01,0.1612903225806451,Manhattan,Upper East Side North,Yellow Zone
1,2019-12-01 00:23:03,2019-12-01 00:34:18,1,3.2,1,N,229,7,2,12.0,3.0,0.5,0.0,0.0,0.3,15.8,2019_12,2019-12-01,2019-12-01,0.0,Manhattan,Sutton Place/Turtle Bay North,Yellow Zone
2,2019-11-30 23:59:12,2019-12-01 00:06:25,3,1.74,1,N,79,170,1,7.5,0.5,0.5,2.26,0.0,0.3,13.56,2019_11,2019-11-30,2019-12-01,0.1666666666666666,Manhattan,East Village,Yellow Zone
2,2019-12-01 00:35:53,2019-12-01 00:51:05,2,2.72,1,N,246,233,1,12.0,0.5,0.5,3.16,0.0,0.3,18.96,2019_12,2019-12-01,2019-12-01,0.1666666666666666,Manhattan,West Chelsea/Hudson Yards,Yellow Zone
2,2019-12-01 00:29:09,2019-12-01 00:42:03,1,5.71,1,N,148,226,1,17.5,0.5,0.5,4.26,0.0,0.3,25.56,2019_12,2019-12-01,2019-12-01,0.1666666666666666,Manhattan,Lower East Side,Yellow Zone


## Azure Storage as a destination
* One option for streaming output is to write directly to you data lake storage (Azure Data Lake Storage Gen 2 or standard Azure Blob Storage).
* Databricks Delta / Delta Lake file format makes this more efficient, but could do with Parquet, Avro or other formats.

In [11]:
(
 trip_df.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", f"/delta/events/_checkpoints/streaming_demo_{run_version}")
  .partitionBy("year_month")
  .start(yellow_delta_path)
)

In [12]:
# delta_batch_df = spark.read.format("delta").load(yellow_delta_path).limit(1000)
# display(delta_batch_df)

delta_stream_df = spark.readStream.format("delta").load(yellow_delta_path).filter(col("tpep_pickup_datetime") == "2019-12-01 00:18:15")
display(delta_stream_df)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,year_month,pickup_dt,dropoff_dt,tip_pct,Borough,PickupZone,PickupServiceZone
2,2019-12-01 00:18:15,2019-12-01 00:39:45,1,5.08,1,N,163,223,1,19.0,0.5,0.5,2.0,0.0,0.3,24.8,2019_12,2019-12-01,2019-12-01,0.0806451612903225,Manhattan,Midtown North,Yellow Zone
2,2019-12-01 00:18:15,2019-12-01 00:21:35,2,1.25,1,N,141,229,1,5.5,0.5,0.5,1.0,0.0,0.3,10.3,2019_12,2019-12-01,2019-12-01,0.0970873786407767,Manhattan,Lenox Hill West,Yellow Zone
2,2019-12-01 00:18:15,2019-12-01 00:29:05,1,1.52,1,N,137,79,1,9.0,0.5,0.5,3.84,0.0,0.3,16.64,2019_12,2019-12-01,2019-12-01,0.2307692307692307,Manhattan,Kips Bay,Yellow Zone


### Alternatively: Send transformed data to Event Hubs for next steps in pipeline

In [14]:
topic2 = 'demo-message-transformed'

password = dbutils.secrets.get("data-lake-demo", "eh-sasl-{0}".format(topic2))

EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{0}";'.format(password)

producer_config = {
    'kafka.bootstrap.servers': 'dustin-demo-eh.servicebus.windows.net:9093',
    'kafka.security.protocol': 'SASL_SSL',
    'kafka.sasl.mechanism': 'PLAIN',
    'kafka.request.timeout.ms': "60000",
    'kafka.session.timeout.ms': "30000",
    'kafka.sasl.jaas.config': EH_SASL,
    'topic': topic2
}

kafka_output_df = trip_df.selectExpr(
    "CAST(VendorId as STRING) as key",
    "to_json(struct(*)) as value")

# display(kafka_output_df)
kafka_output_df.writeStream \
  .format("kafka") \
  .options(**producer_config) \
  .option("checkpointLocation", f"/delta/events/_checkpoints/cp_{run_version}") \
  .start()

## Tests and other alternative syntax
Collection of examples not used in demo but show other capabilities and syntax.

In [16]:
# Exmaple of grouping the data in a stream, which requires defining a window

# from pyspark.sql.functions import window  

# grouped_df = (
#       trips2_df
#         .groupBy(
#            col("passenger_count"),
#            window("tpep_pickup_datetime", "10 minutes"))
#         .sum("trip_distance")
#       )

# # display(grouped_df)

In [17]:
# # Example to get actual json schema from sample file (avoid typing it all manually)
# file_location = "/mnt/blobdemo/nyc_taxi_raw/yellow_tripdata_2018-01.csv"
# sample_df = spark.read \
#     .option('sep',',') \
#     .option("inferSchema","true") \
#     .option("header", "true") \
#     .csv(file_location)
# json_schema = sample_df.schema

In [18]:
transformed_df = spark.read.format("delta").load(yellow_delta_path).limit(1000)
zone_df = spark.read.format("delta").load(taxi_zone_path)
trip_df = transformed_df.join(zone_df, transformed_df.PULocationID == zone_df.LocationID, how="left").drop("LocationID")
display(trip_df)

In [19]:
GROUP_ID = f"test-group-2-{run_version}"
consumer2_config = {
    'kafka.bootstrap.servers': 'dustin-demo-eh.servicebus.windows.net:9093',
    'kafka.security.protocol': 'SASL_SSL',
    'kafka.sasl.mechanism': 'PLAIN',
    'kafka.request.timeout.ms': "60000",
    'kafka.session.timeout.ms': "20000",
    'kafka.sasl.jaas.config': EH_SASL,
    'kafka.group.id': GROUP_ID,
    'subscribe': topic2
}

df3 = spark.readStream \
    .format("kafka") \
    .options(**consumer2_config) \
    .load()

df3 = df3.select(col("value").cast("string"))

# display(df3)

In [20]:
# output_path = f"/mnt/adlsdemo/streaming_output_{run_version}"

# delta_stream_df = spark.readStream.format("delta").load(output_path)
# delta_stream_df.writeStream.format("delta").option("checkpointLocation", "/mnt/adlsdemo/checkpoints/demo_v1").start(output_path+"2")
#  display(delta_stream_df)

In [21]:
df = (
  spark.read
    .option("header","true")
    .option("inferSchema", "true")
    .csv("/databricks-datasets/nyctaxi/taxizone/taxi_rate_code.csv") 
  )

df.write.format("delta").mode("overwrite").save(taxi_rate_path)

display(df)

RateCodeID,RateCodeDesc
1,Standard Rate
2,JFK
3,Newark
4,Nassau or Westchester
5,Negotiated fare
6,Group ride


In [22]:
input_df = (
  spark.read
    .option("header","true")
    .schema(trip_schema)
    .csv("dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-12.csv.gz")
)

input_df.coalesce(1).write.json("dbfs:/data/nyctaxi/december_json")

In [23]:
%fs ls /mnt/adlsdemo/streaming_output_v20/

#### Possible error  
Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'YYYY-MM-dd hh:mm:ss' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html