In [0]:
from datetime import datetime
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, LongType

In [0]:
schema_name = dbutils.widgets.get('schema_name')
database_name = dbutils.widgets.get('database_name')
table_name = dbutils.widgets.get('table_name')
partition_column = dbutils.widgets.get('partition_column')

stream_source = f'/Volumes/raw/{database_name}/{table_name}'
stream_checkpoint = f'/Volumes/raw/{database_name}/{table_name}_checkpoint/'
stream_schema = StructType(
    [
        StructField("abstract", StringType(), True),
        StructField("byline", StringType(), True),
        StructField("created_date", StringType(), True),
        StructField("des_facet", ArrayType(StringType(), True), True),
        StructField("geo_facet", ArrayType(StringType(), True), True),
        StructField("item_type", StringType(), True),
        StructField("kicker", StringType(), True),
        StructField("material_type_facet", StringType(), True),
        StructField(
            "multimedia",
            ArrayType(
                StructType(
                    [
                        StructField("caption", StringType(), True),
                        StructField("copyright", StringType(), True),
                        StructField("format", StringType(), True),
                        StructField("height", LongType(), True),
                        StructField("subtype", StringType(), True),
                        StructField("type", StringType(), True),
                        StructField("url", StringType(), True),
                        StructField("width", LongType(), True),
                    ]
                ),
                True,
            ),
            True,
        ),
        StructField("org_facet", ArrayType(StringType(), True), True),
        StructField("per_facet", ArrayType(StringType(), True), True),
        StructField("published_date", StringType(), True),
        StructField("section", StringType(), True),
        StructField("short_url", StringType(), True),
        StructField("subsection", StringType(), True),
        StructField("title", StringType(), True),
        StructField("updated_date", StringType(), True),
        StructField("uri", StringType(), True),
        StructField("url", StringType(), True),
        StructField('ref_date', StringType(), True),
        StructField("ingestion_date", StringType(), True),
    ]
)

In [0]:
# Function to identify if a table already exists
def table_exists(schema_name, database_name, table_name):
    spark.catalog.setCurrentCatalog(schema_name)
    
    return spark.catalog.tableExists(f'{database_name}.{table_name}')

In [0]:
def ingestion(df):
    df = df.withColumn('ingestion_date', lit(datetime.now().strftime('%Y-%m-%d')))
    df.createOrReplaceTempView('payload')

    spark.sql(f'''
        INSERT INTO {schema_name}.{database_name}.{table_name}
        REPLACE USING ({partition_column})
        SELECT * FROM payload
    ''')


In [0]:
if table_exists(schema_name, database_name, table_name):
    print(f'Table {table_name} already exists in {schema_name}.{database_name}')
else:
    print(f'Creating {table_name} table in {schema_name}.{database_name}...')

    df_creation = spark.createDataFrame([], schema=stream_schema)

    (df_creation.write
     .format('delta')
     .mode('overwrite')
     .partitionBy('ref_date')
     .saveAsTable(f'{schema_name}.{database_name}.{table_name}')
    )

    print(F'{table_name} table created successfully at {schema_name}.{database_name}')


In [0]:
df_stream = (spark.readStream
                .format('cloudFiles')
                .option('cloudFiles.format', 'json')
                .schema(stream_schema)
                .load(stream_source)
)

In [0]:
stream = (df_stream.writeStream
    .option("checkpointLocation", stream_checkpoint)
    .foreachBatch(
        lambda df, epoch_id: ingestion(df)
    )
    .trigger(availableNow=True)
)

In [0]:
start = stream.start()