**     Silver Layer: Clean, Join, and Store as Delta Tables
**

In [6]:
import re
from pyspark.sql.functions import col,current_timestamp,to_date

StatementMeta(, ead5627d-7984-449c-9339-7d7b72894f43, 8, Finished, Available, Finished)

In [7]:
def clean_columns_df(df):
    new_columns=[]
    for column_name in df.columns:
        clean_columns= column_name.lower().replace("-","_").replace(" ","_")
        clean_columns= re.sub(r'[^\w]', ' ', clean_columns)               
        new_columns.append(clean_columns)
    return df.toDF(*new_columns)

StatementMeta(, ead5627d-7984-449c-9339-7d7b72894f43, 9, Finished, Available, Finished)

In [8]:
assingments_df=spark.read.format('csv')\
                    .option("header","true")\
                    .option("inferSchema",'true')\
                    .load("Files/bronze/Assignments.csv")

courses_df=spark.read.format('csv')\
                 .option("header","true")\
                 .option("inferSchema",'true')\
                 .load("Files/bronze/Courses.csv")

students_df=spark.read.format('csv')\
                  .option("header","true")\
                  .option("inferSchema","true")\
                  .load("Files/bronze/Students.csv")

submissions_df=spark.read.format('csv')\
                    .option("header","true")\
                    .option("inferSchema",'true')\
                    .load("Files/bronze/Submissions.csv")

StatementMeta(, ead5627d-7984-449c-9339-7d7b72894f43, 10, Finished, Available, Finished)

In [9]:
assingments_df=clean_columns_df(assingments_df)
courses_df=clean_columns_df(courses_df)
students_df=clean_columns_df(students_df)
submissions_df=clean_columns_df(submissions_df)

StatementMeta(, ead5627d-7984-449c-9339-7d7b72894f43, 11, Finished, Available, Finished)

In [10]:
assingments_df=assingments_df.withColumn("due_date",to_date("due_date","yyyy-MM-dd"))
students_df=students_df.withColumn("enrollment_date",to_date("enrollment_date","yyyy-MM-dd"))
submissions_df=submissions_df.withColumn("submission_date",to_date("submission_date","yyyy-MM-dd"))

StatementMeta(, ead5627d-7984-449c-9339-7d7b72894f43, 12, Finished, Available, Finished)

In [11]:
assingments_df=assingments_df.withColumn("ingest_time", current_timestamp())
students_df=students_df.withColumn("ingest_time",current_timestamp())
submissions_df=submissions_df.withColumn("ingest_time",current_timestamp())
courses_df=courses_df.withColumn("ingest_time",current_timestamp())

StatementMeta(, ead5627d-7984-449c-9339-7d7b72894f43, 13, Finished, Available, Finished)

In [13]:
assingments_df.write.format("delta").mode("overwrite").saveAsTable("silver_assingments")
students_df.write.format("delta").mode("overwrite").saveAsTable("silver_students")
courses_df.write.format("delta").mode("overwrite").saveAsTable("silver_courses")
submissions_df.write.format("delta").mode("overwrite").saveAsTable("silver_submissions")

StatementMeta(, ead5627d-7984-449c-9339-7d7b72894f43, 15, Finished, Available, Finished)

In [3]:
table_name='silver_assignment_courses'
df=spark.sql("""
 select
        assignment_id,
        assignment_name,
        assign.course_id,
        course_name,
        instructor_name,
        due_date 
    FROM silver_assingments assign
    JOIN silver_courses course
        ON assign.course_id = course.course_id
        """)

df.write.format("delta").mode("overWrite").saveAsTable(table_name)


StatementMeta(, 4f8adf17-38e1-47c9-880d-a1d4a02181f2, 5, Finished, Available, Finished)

In [4]:
table_name='silver_students_submissions'
df=spark.sql("""
     SELECT 
        student.student_id,
        full_name,
        email_address,
        enrollment_date,
        submission_id,
        assignment_id,
        submission_date
    FROM silver_students student
    JOIN silver_submissions sub
        ON student.student_id = sub.student_id 
        """)

df.write.format("delta").mode("overWrite").saveAsTable(table_name)


StatementMeta(, 4f8adf17-38e1-47c9-880d-a1d4a02181f2, 6, Finished, Available, Finished)

In [14]:
assingments_df.printSchema()
courses_df.printSchema()
students_df.printSchema()
submissions_df.printSchema()

StatementMeta(, ead5627d-7984-449c-9339-7d7b72894f43, 16, Finished, Available, Finished)

root
 |-- assignment_id: string (nullable = true)
 |-- course_id: string (nullable = true)
 |-- assignment_name: string (nullable = true)
 |-- due_date: date (nullable = true)
 |-- ingest_time: timestamp (nullable = false)

root
 |-- course_id: string (nullable = true)
 |-- course_name: string (nullable = true)
 |-- instructor_name: string (nullable = true)
 |-- ingest_time: timestamp (nullable = false)

root
 |-- student_id: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- email_address: string (nullable = true)
 |-- enrollment_date: date (nullable = true)
 |-- ingest_time: timestamp (nullable = false)

root
 |-- submission_id: string (nullable = true)
 |-- assignment_id: string (nullable = true)
 |-- student_id: string (nullable = true)
 |-- submission_date: date (nullable = true)
 |-- submitted_content: string (nullable = true)
 |-- ingest_time: timestamp (nullable = false)

