In [13]:
### Packages and dependencies

import os
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import boto3
from botocore.exceptions import NoCredentialsError
from os import listdir
from os.path import join as path_join
from pyspark.sql.functions import year, month, dayofmonth

In [14]:
## Environment variables and JARS

ENVIRON=os.environ.get('ENVIRON')
BUCKET=os.environ.get('BUCKET')
access_key=os.environ.get('aws_access_key_id')
secret_key=os.environ.get('aws_secret_access_key')

driver_jars_path = path_join("..","emr","jar")
print(driver_jars_path)
all_jars = listdir(driver_jars_path)
print(all_jars)
all_pathed_jars = [path_join(driver_jars_path, jar)
                           for jar in all_jars]
jars_paths = ",".join(all_pathed_jars)

../emr/jar
['hadoop-aws-3.3.4.jar', 'hadoop-aws-3.3.6.jar', 'mssql-jdbc-10.2.0.jre8.jar', 'mssql-jdbc-12.2.0.jre8.jar', 'spark-mssql-connector-1.0.2.jar', 'sqljdbc4-2.0.jar']


In [15]:
## Environment preparation

if ENVIRON == 'DEV':
    print(f'The environment is {ENVIRON}')
    ##os.environ.setdefault('AWS_PROFILE', 'itvgithub')
    print("access_key: "+access_key)
    print("secret_key: "+secret_key)

The environment is DEV
access_key: AKIAX3BULDGX6234LMNJ
secret_key: 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


In [16]:
## Creating Spark Session

spark = SparkSession.builder.appName("Data Transformation") \
    .config('spark.jars', jars_paths) \
    .config('spark.jars.packages','org.apache.hadoop:hadoop-aws:3.3.6') \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider','org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config('spark.hadoop.fs.s3a.access.key', access_key) \
    .config('spark.hadoop.fs.s3a.secret.key', secret_key).getOrCreate()

In [17]:
## Bronze layer schema validation

BRONZE=os.environ.get('BRONZE')
    
df=spark.read.json(f"s3a://{BUCKET}/{BRONZE}")
df.printSchema()

                                                                                

root
 |-- _corrupt_record: string (nullable = true)
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- _links: struct (nullable = true)
 |    |    |    |-- html: struct (nullable = true)
 |    |    |    |    |-- href: string (nullable = true

In [None]:
#Write RAM Data to Parquet Files

SILVER=os.environ.get('SILVER') 
dfSilver = df.select().withColumn("year", year("created_at")).withColumn("month",month("created_at")).withColumn("dayofmonth",dayofmonth("created_at"))
dfWrite = dfSilver.select("year","month","dayofmonth")
dfWrite.printSchema()

In [None]:
## Sample Raw data
dfWrite.show()

In [None]:
dfWrite.write.mode("overwrite").format("parquet").save("s3a://gh-data-project/silver")

In [None]:
spark.stop()