In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import *
import json

In [None]:
SOURCE = "mongo"
DEST = "landing"

In [None]:
connectionString='mongodb+srv://CONNECTION_STRING_HERE/
database="sample_supplies"

In [None]:
def child_struct(nested_df):
    list_schema = [((), nested_df)]
    flat_columns = []

    while len(list_schema) > 0:
          parents, df = list_schema.pop()
          flat_cols = [  col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],))) for c in df.dtypes if c[1][:6] != "struct"   ]
      
          struct_cols = [  c[0]   for c in df.dtypes if c[1][:6] == "struct"   ]
      
          flat_columns.extend(flat_cols)
          for i in struct_cols:
                projected_df = df.select(i + ".*")
                list_schema.append((parents + (i,), projected_df))
    return nested_df.select(flat_columns)

def master_array(df):
    array_cols = [c[0] for c in df.dtypes if c[1][:5]=="array"]
    while len(array_cols)>0:
        for c in array_cols:
            df = df.withColumn(c,explode_outer(c))
        df = child_struct(df)
        array_cols = [c[0] for c in df.dtypes if c[1][:5]=="array"]
    return df

In [None]:
def ingest_data(df, table, pk_col, partition, mode):
  mode = mode.lower()
  ingestion_time = datetime.now()
  dest_path = f"{SOURCE}/{DEST}/{table}/{ingestion_time.strftime('%Y%m%d')}"

  df = df.withColumn('INGESTION_TIME', F.lit(ingestion_time))

  if mode == "append":
    print(f"Append table: {table}")
    (df.write
     .format("delta")
     .mode(mode)
     .option("mergeSchema", "true")
     .partitionBt(partition)
     .saveAsTable(table)
    )
  
  elif mode == "overwrite":
    print(f"Overwrite table: {table}")
    (df.write
      .format("delta")
      .mode(mode)
      .option("overwriteSchema", "true")
      .partitionBt(partition)
      .saveAsTable(table)
    )

  elif mode == "merge":
    deltaTable = DeltaTable.forName(spark, table)
    print(f"Merge table: {table}")
    (deltaTable.alias("target")
     .merge(df.alias("update"), f"target.{pk_col} = update.{pk_col}")
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute
    )

In [None]:
collection="sales"

pipeline="[{'$match': { 'items.name':'printer paper' }}, {'$unwind': { path: '$items' }}, {'$addFields': { totalSale: { \
	'$multiply': [ '$items.price', '$items.quantity' ] } }}, {'$project': { saleDate:1,totalSale:1,_id:0 }}]"

salesDF = spark.read.format("mongodb").option("database", database).option("collection", collection).option("pipeline", pipeline).option("partitioner", "MongoSinglePartitioner").option("spark.mongodb.input.uri", connectionString).load()
display(salesDF)

In [None]:
salesDF = master_array(salesDF)

In [None]:
ingest_data(salesDF, collection, "_id", "city", mode="append")