# Data Engineering Architecture for Large Scale Data Processing - Part I

## Import Relevant Spark Libraries

In [None]:
import time
import os
import sys
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from glob import glob
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *

import databricks.koalas as ks

## Spark Session Builder

In [None]:
spark = SparkSession.builder \
                    .appName("DevIngestApp") \
                    .config('spark.port.maxRetries', 20) \
                    .enableHiveSupport() \
                    .getOrCreate() 
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("fs.azure.account.key.devstore.dfs.core.windows.net", dbutils.secrets.get(scope = "main-scope", key = "main-key"))
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [None]:
### Environment Variables
# Date Variable
DATESTAMP = datetime.today().strftime("%Y%m%d") ## format = '2022-03-01' for March 1, 2022
MONTHSTAMP = datetime.today().strftime('%Y%m')  ## format = '202203'     for March 1, 2022

## Read Data

In [None]:
def read_data(read_path, file_name):

    read_schema = StructType([
                    StructField("identifier", IntType(), True),
                    StructField("news_head", StringType(), True),
                    StructField("draft_date", DateType(), True), 
                    StructField("air_date", DateType(), True), 
                    StructField("views", DoubleType(), True),
                    StructField("comments", StringType(), True),
                    StructField("mentions", StringType(), True),
                    StructField("views_external", DoubleType(), True),
                    StructField("status", BoolType(), True),
                    StructField("headline", BoolType(), True),
                    StructField("src_file_rec", IntegerType(), True),
                    StructField("src_file_name", StringType(), True)
                    ])

    df = spark.read.load(read_path, format="csv", sep=",", schema=read_schema, header="true")
    file_name = file_name.split('.')[0]
    print("\nFile:", file_name, "Recs: ", df.count())
    # df.printSchema()

    return df


## Main

In [None]:
stage_name = "Source"
flow_name = "DailyNews"
source_path = '/mnt/%s/%s/' % (stage_name, flow_name)
files = dbutils.fs.ls(source_path)
processed_files = []
processed_date = datetime.now().strftime('%Y-%m-%d')
fdebug=False
verbose=0
for src in files:
    init_df = spark.createDataFrame(data=[], schema=StructType([]))
    # print(src.name, src.isDir, src.isFile, src.path, src.size)
    file_name = (src.name.split('.')[0]).split('_')[0]
    src_df = read_data(src.path, file_name)
    # print("File:", file_name, "Recs: ", src_df.count())
    dbutils.notebook.run("ExecuteValidationsNB", src_df)
    #################################################################################
    