In [0]:
%run /Users/elagab.delivery@gmail.com/load_access_keys

In [0]:
%run /Users/elagab.delivery@gmail.com/cleaning_tools

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [0]:
# streaming tools
def create_schema(columns_list):
    fields = []
    for column in columns_list:
        field = StructField(column, StringType(), True)
        fields.append(field)
    return StructType(fields)


def create_streaming_dataframe(stream_name, sceham, ACCESS_KEY, SECRET_KEY):
    df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName',stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .option('format', 'json') \
    .load()
    df = df.selectExpr("CAST(data as STRING)")
    df = df.select(from_json("data", schema).alias("json")).select("json.*")
    return df

def write_stream_into_delta_table(table_name):
    # dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
    df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table(table_name)


In [0]:
# extract data stream
# pin
columns_list = ['category', 'description', 'downloaded', 'follower_count', 'image_src', 'index', 'is_image_or_video', 'poster_name', 'save_location', 'tag_list', 'title', 'unique_id']
schema = create_schema(columns_list)
df_pin_stream = create_streaming_dataframe('streaming-12f2c229fbdb-pin', schema, ACCESS_KEY, SECRET_KEY)
# geo
columns_list = ['country', 'ind', 'latitude', 'longitude', 'timestamp']
schema = create_schema(columns_list)
df_geo_stream = create_streaming_dataframe('streaming-12f2c229fbdb-geo', schema, ACCESS_KEY, SECRET_KEY)
# user
columns_list = ["ind", "first_name", "last_name", "age", "date_joined"]
schema = create_schema(columns_list)
df_user_stream = create_streaming_dataframe('streaming-12f2c229fbdb-user', schema, ACCESS_KEY, SECRET_KEY)


In [0]:
# clean_data
clean_df_pin_stream = clean_df_pin(df_pin_stream)
clean_df_geo_stream = clean_df_geo(df_geo_stream)
clean_df_user_stream = clean_df_user(df_user_stream)

In [0]:
# load stream into delta table
# pin
table_name = "12f2c229fbdb_pin_table"
write_stream_into_delta_table(table_name)
# geo
table_name = "12f2c229fbdb_geo_table"
write_stream_into_delta_table(table_name)
# user
table_name = "12f2c229fbdb_user_table"
write_stream_into_delta_table(table_name)

In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

In [0]:
# geo
table_name = "12f2c229fbdb_geo_table"
write_stream_into_delta_table(table_name)

In [0]:
# user
table_name = "12f2c229fbdb_user_table"
write_stream_into_delta_table(table_name)

In [0]:
# ---------------------------------------

In [0]:
%sql
SELECT COUNT(ind) FROM 12f2c229fbdb_geo_table;

count(ind)
15928
