In [607]:
import os
from pyspark.sql import SparkSession,HiveContext
from pyspark.sql.functions import explode,col,to_timestamp,udf,unix_timestamp,current_timestamp,from_unixtime,to_date
import shutil # util to copy from incremental folder to working folder
from datetime import date,datetime
from pyspark.sql.types import IntegerType,StringType,DateType,TimestampType 
import subprocess
from pyspark import SparkContext, HiveContext
from pyspark import SQLContext
from pyspark.sql.functions import udf
from datetime import datetime as dt
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


# Configurations and Constents

In [608]:
#########  Reading from local #####################
# incremental_directory = "/Users/jpedapally/Documents/project/intute/source/incremental/"
# historic_directory    = "/Users/jpedapally/Documents/project/intute/source/historic/"
# archive_directory     = "/Users/jpedapally/Documents/project/intute/source/archive/"
# error_directory       = "/Users/jpedapally/Documents/project/intute/source/error/"
# working_directory     = "/Users/jpedapally/Documents/project/intute/source/working/"
# output_directory      = "/Users/jpedapally/Documents/project/intute/source/output/"
# temp_directory        = "/Users/jpedapally/Documents/project/intute/source/temp/"

# #########  Reading from HDFS #####################
incremental_directory = "hdfs://localhost:9000/source/incremental/"
historic_directory    = "hdfs://localhost:9000/source/historic/"
archive_directory     = "hdfs://localhost:9000/source/archive/"
error_directory       = "hdfs://localhost:9000/source/error/"
working_directory     = "hdfs://localhost:9000/source/working/"
output_directory      = "hdfs://localhost:9000/source/output/"
temp_directory        = "hdfs://localhost:9000/source/temp/"

# Spark initialization

In [609]:
spark = SparkSession\
.builder.master("localhost:9000")\
.appName("jdbc data sources")\
.config("spark.driver.bindAddress","127.0.0.1")\
.enableHiveSupport()\
.getOrCreate()  

# # User functions

In [610]:
def display_df(df):
    df.printSchema()
    df.show(5,False)

def read_directory(directory):
    ########## Reading all files in directory ########## 
    print ("=================================================================================================")
    print (f"Started reading '{directory}' files in working_directory {datetime.now()}")
    print ("=================================================================================================")
    df = spark.read.option("header","true").option("delimiter",",").option("inferschems","True").format("json").load(f"{directory}*.json")
    print (f"Read file {directory}")  
    print ("=================================================================================================")
    print (f"Total we read '{directory}' files in working_directory {datetime.now()}")
    print ("=================================================================================================") 
    return df


def remove_dups(df):
    return df.dropDuplicates(["Agentid","CallStartTimestamp","Channel","ContactId"]) 
    

    
def remove_missing_data(df):
    return df.where("Agentid IS NOT NULL AND CallStartTimestamp IS NOT NULL AND CallEndTimestamp IS NOT NULL")
    
    
def clean_df(df):
    df_exp = df.withColumn("AgentCallbackMessage", col("Attributes.AgentCallbackMessage"))\
    .withColumn("AgentHoldLoopDuration", col("Attributes.AgentHoldLoopDuration"))\
    .withColumn("CallRecordingEnabled", col("Attributes.CallRecordingEnabled"))\
    .withColumn("OfferQualityFeedback", col("Attributes.OfferQualityFeedback"))\
    .withColumn("QueueDuration", col("Attributes.QueueDuration"))\
    .withColumn("QueueName", col("Attributes.QueueName"))\
    .withColumn("QueueOverrideEnabled", col("Attributes.QueueOverrideEnabled"))\
    .withColumn("QueueType", col("Attributes.QueueType"))\
    .withColumn("Rating", col("Attributes.Rating")) 
    return df_exp


udf
def diff_time_in_sec(start_time,end_time):
    diff  = end_time - start_time 
    return diff.seconds

udf_diff_time_in_sec = udf(diff_time_in_sec,IntegerType())

def run_cmd(args_list):
        """
        run linux commands
        """
        # import subprocess
        print('Running system command: {0}'.format(' '.join(args_list)))
        proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        s_output, s_err = proc.communicate()
        s_return =  proc.returncode
        return s_return, s_output, s_err 



# Read all files in working directory
# Check if we have multiple files and move them to working directory

In [611]:
try:
    ################ move incremental files to working_directory ########################
#     (ret, out, err)= run_cmd(['hdfs', 'dfs', '-mv', f'{incremental_directory}', f'{working_directory}'])

    
    
    ################ maintain the tract of files_names read ########################
    
    #### To check what all files are processed  #### 
    
    
    
    
    
    
    number_files_working_directory = 3
    ################ After moving to working dir cleaning the files ########################
    if number_files_working_directory > 0:
        df_incr = read_directory(incremental_directory)  #### Read hdfs files ####
        df_incr_cleaned = clean_df(df_incr)  
        df_incr_no_dups=remove_dups(df_incr_cleaned)
        df_incr_intr = remove_missing_data(df_incr_no_dups)
        df_incr_f = df_incr_intr.withColumn("recordAddedOn",current_timestamp())\
        .withColumn("recordAddedOn",current_timestamp())\
        .withColumn("recordUpdatedOn",current_timestamp())\
        .withColumn("CallStartTimestamp",to_timestamp("CallStartTimestamp"))\
        .withColumn("CallEndTimestamp",to_timestamp("CallEndTimestamp"))\
        .withColumn("partitionByDate",to_date("CallStartTimestamp"))\
        .where("Agentid IS NOT NULL AND CallStartTimestamp IS NOT NULL AND CallEndTimestamp IS NOT NULL AND Channel IS NOT NULL")


        df_incr_f0 = df_incr_f.withColumn("callDuration",udf_diff_time_in_sec((col("CallStartTimestamp")),(col("CallEndTimestamp"))))
        df_incr_f1 = df_incr_f0.coalesce(1)
        
        df_incr_final= df_incr_f1
        display_df(df_incr_final)
        
        df_incr_final.coalesce(1).write.mode("overwrite").parquet(f'{temp_directory}')
    
    
        
        

    
        
    else:
        print ("No files to process")
        exit(0)
except FileNotFoundError as e:
    print ("No files to process")
    print (repr(e))
except Except as e:
    print (repr(e))


Started reading 'hdfs://localhost:9000/source/incremental/' files in working_directory 2022-01-11 22:50:26.302340
Read file hdfs://localhost:9000/source/incremental/
Total we read 'hdfs://localhost:9000/source/incremental/' files in working_directory 2022-01-11 22:50:26.503413
root
 |-- Agentid: string (nullable = true)
 |-- Attributes: struct (nullable = true)
 |    |-- AgentCallbackMessage: string (nullable = true)
 |    |-- AgentHoldLoopDuration: string (nullable = true)
 |    |-- CallRecordingEnabled: string (nullable = true)
 |    |-- OfferQualityFeedback: string (nullable = true)
 |    |-- QueueDuration: string (nullable = true)
 |    |-- QueueName: string (nullable = true)
 |    |-- QueueOverrideEnabled: string (nullable = true)
 |    |-- QueueType: string (nullable = true)
 |    |-- Rating: string (nullable = true)
 |-- CallEndTimestamp: timestamp (nullable = true)
 |-- CallStartTimestamp: timestamp (nullable = true)
 |-- Channel: string (nullable = true)
 |-- ContactId: string

# Merg incr and Hist files
    

In [612]:
############ https://dwgeek.com/sql-merge-operation-using-pyspark-upsert-example.html/
df_incr_final.createOrReplaceTempView("incr_table")
df_hist_final.createOrReplaceTempView("hist_table_full")


################ historic df without incremental dates ########################
df_hist_table_without_inc_dates = spark.sql("select * from hist_table_full where partitionByDate not in (select distinct date(CallStartTimestamp) from incr_table) ")

################ historic df with incremental dates ########################
df_hist_table_with_inc_dates = spark.sql("select * from hist_table_full where partitionByDate in (select distinct date(CallStartTimestamp) from incr_table) ")


################ historic df with incremental dates UNION with incremental data ########################

df_hist_union_incr = df_hist_table_with_inc_dates.unionByName(df_incr_final)

df_merge = df_hist_union_incr.withColumn("_row_number", \
                                         row_number()\
                                         .over(Window\
                                               .partitionBy \
                                               (df_hist_union_incr['Agentid'],\
                                                df_hist_union_incr['CallStartTimestamp'],\
                                                df_hist_union_incr['Channel'])\
                                               .orderBy('recordUpdatedOn')))
# df_merge.printSchema()
# df_merge.where("_row_number > 1").show(100,False)


# Write the dataframe to hdfs

# One time load of historinc Data

In [578]:
# ################ historic df with incremental dates UNION with incremental data ########################

# # bin/hdfs dfs -ls /source/error/
# # bin/hdfs dfs -rm -r /source/error/
# # bin/hdfs dfs -put /Users/jpedapally/Documents/project/intute/source/historic/*.json /source/error/
# # bin/hdfs dfs -ls /source/error/
# # bin/hdfs dfs -get hdfs://localhost:9000/source/historic/ /Users/jpedapally/Documents/project/intute/source/historic/


# # https://stackoverflow.com/questions/53329250/difference-between-dates-in-pyspark-sql





# df_hist = read_directory(error_directory)  #### Read hdfs files ####
# df_hist_cleaned = clean_df(df_hist)  
# df_hist_no_dups=remove_dups(df_hist_cleaned)
# df_hist_intr = remove_missing_data(df_hist_no_dups)
# df_hist_f = df_hist_intr.withColumn("recordAddedOn",current_timestamp())\
# .withColumn("recordAddedOn",current_timestamp())\
# .withColumn("recordUpdatedOn",current_timestamp())\
# .withColumn("CallStartTimestamp",to_timestamp("CallStartTimestamp"))\
# .withColumn("CallEndTimestamp",to_timestamp("CallEndTimestamp"))\
# .withColumn("partitionByDate",to_date("CallStartTimestamp"))\
# .where("Agentid IS NOT NULL AND CallStartTimestamp IS NOT NULL AND CallEndTimestamp IS NOT NULL AND Channel IS NOT NULL")
 

# df_hist_final = df_hist_f.withColumn("callDuration",udf_diff_time_in_sec((col("CallStartTimestamp")),(col("CallEndTimestamp"))))
 
# display_df(df_hist_final) 

# df_hist_final.write.mode("overwrite").partitionBy("partitionByDate").parquet(f'{historic_directory}')



Started reading 'hdfs://localhost:9000/source/error/' files in working_directory 2022-01-10 01:04:03.898072
Read file hdfs://localhost:9000/source/error/
Total we read 'hdfs://localhost:9000/source/error/' files in working_directory 2022-01-10 01:04:03.987278
root
 |-- Agentid: string (nullable = true)
 |-- Attributes: struct (nullable = true)
 |    |-- AgentCallbackMessage: string (nullable = true)
 |    |-- AgentHoldLoopDuration: string (nullable = true)
 |    |-- CallRecordingEnabled: string (nullable = true)
 |    |-- OfferQualityFeedback: string (nullable = true)
 |    |-- QueueDuration: string (nullable = true)
 |    |-- QueueName: string (nullable = true)
 |    |-- QueueOverrideEnabled: string (nullable = true)
 |    |-- QueueType: string (nullable = true)
 |    |-- Rating: string (nullable = true)
 |-- CallEndTimestamp: timestamp (nullable = true)
 |-- CallStartTimestamp: timestamp (nullable = true)
 |-- Channel: string (nullable = true)
 |-- ContactId: string (nullable = true)

In [None]:
############### Big Query Analysis ################
# -----------------------------------------------------------------------
# -- Please write SQL queries to find answers of the following business questions from call_logs table
# -- a. What is the average length in seconds of all calls (i.e. all channels)
# -- received by each agent between Oct-15-2019 and Oct-30-2019?
# -----------------------------------------------------------------------
# select AVG(callDuration)  from `projectn-281322.calldata.call_data_historic`
# where date(CallStartTimestamp) between '2019-10-15' AND '2019-10-30'
# AND InitiationMethod = 'INBOUND'; 
# -----------------------------------------------------------------------
# -- b. Get total number of calls served by each agent per channel group.
# -----------------------------------------------------------------------
# select Agentid,Channel,InitiationMethod, count(concat(CallEndTimestamp,	CallStartTimestamp)) number_of_call  
# from `projectn-281322.calldata.call_data_historic`
# group by 1,2,3 order by 1,2,3;
# -----------------------------------------------------------------------
# -- c. Show top 5 agents based on number of calls per month.
# -----------------------------------------------------------------------
# select extract(month from CallEndTimestamp) month,Agentid,Channel,InitiationMethod, count(concat(CallEndTimestamp,	CallStartTimestamp))  
# from `projectn-281322.calldata.call_data_historic`
# group by 1,2,3,4 order by 1,2,3;


# pseudo code load redshift from s3

In [6]:
# pseudo code load redshift from s3

# https://dwgeek.com/how-to-export-spark-dataframe-to-redshift-table.html/

export CLASSPATH=$PWD/RedshiftJDBC42-1.1.17.1017.jar

pyspark --conf spark.executor.extraClassPath=/path/RedshiftJDBC42-1.1.17.1017.jar --driver-class-path /path/RedshiftJDBC42-1.1.17.1017.jar --jars /path/RedshiftJDBC42-1.1.17.1017.jar

testDf.write.format('jdbc').options(
      url='jdbc:redshift://testredshift.us-east-1.redshift.amazonaws.com:5439/dev',	  
      driver='com.amazon.redshift.jdbc42.Driver',
      dbtable='public.df_load_test',
      user='redshiftuser',
      password='Password').mode('append').save() 

testDf.write.format('jdbc').options(
      url='jdbc:redshift://testredshift.us-east-1.redshift.amazonaws.com:5439/dev',	  
      driver='com.amazon.redshift.jdbc42.Driver',
      dbtable='public.df_load_test',
      user='redshiftuser',
      password='Password').mode('overwrite').save() 



# Create partition table , https://docs.aws.amazon.com/redshift/latest/dg/c-spectrum-external-tables.html
create external table spectrum.sales_part(
salesid integer,
listid integer,
sellerid integer,
buyerid integer,
eventid integer,
dateid smallint,
qtysold smallint,
pricepaid decimal(8,2),
commission decimal(8,2),
saletime timestamp)
partitioned by (salesmonth char(10), event integer)
row format delimited
fields terminated by '|'
stored as parquet 
location 's3://awssampledbuswest2/tickit/spectrum/sales_partition/'
table properties ('numRows'='172000');

# Drop partitions
alter table spectrum.sales_part
drop partition(saledate='2008-01-01');

# Add partitions
alter table spectrum.sales_part add
partition(saledate='2008-01') 
location 's3://awssampledbuswest2/tickit/spectrum/sales_partition/saledate=2008-01/'

partition(saledate='2008-02') 
location 's3://awssampledbuswest2/tickit/spectrum/sales_partition/saledate=2008-02/'

partition(saledate='2008-03') 
location 's3://awssampledbuswest2/tickit/spectrum/sales_partition/saledate=2008-03/';

SyntaxError: invalid syntax (<ipython-input-6-6f9d45d48f06>, line 3)

# Useful infromation 

In [None]:
# https://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-common/ClusterSetup.html
# https://techblost.com/how-to-install-hive-on-mac-with-homebrew/
    
    

In [240]:
############## Below commands if you are running from local #########################
# try:
#     ################ Move incr to working (mv incr to working) ########################
#     number_files_incremental_directory = 0
#     incremental_directory_files = []
#     incremental_directory_files = [path for path in os.listdir(incremental_directory) if os.path.isfile(os.path.join(incremental_directory, path))]
#     number_files_incremental_directory = len(incremental_directory_files)

    
#     print (f"We have {incremental_directory_files}  '{number_files_incremental_directory}' files in {incremental_directory} ")
    
#     if number_files_incremental_directory > 0 :
#         allfiles = os.listdir(incremental_directory)
#         for f in allfiles:
#             shutil.move(incremental_directory + f, working_directory + f)
#         print (f"Moved '{number_files_incremental_directory}' files from incremental_directory to working_directory")
    
#     number_files_working_directory = 0
#     working_directory_files = []
#     working_directory_files = [path for path in os.listdir(working_directory) if os.path.isfile(os.path.join(working_directory, path))]
#     number_files_working_directory = len(working_directory_files) 
            
#     print (f"We have {working_directory_files} '{number_files_working_directory}' files in {incremental_directory} ")    
    
#     ################ After moving to working dir cleaning the files ########################
#     if number_files_working_directory > 0:
#         df_incr = read_directory(working_directory)
#         df_incr_cleaned = clean_df(df_incr)
# #         df_incr_final = df_incr_cleaned.withColumn("callDuration",udf_diff_time_in_sec(df_incr_cleaned["CallStartTimestamp"],df_incr_cleaned["CallEndTimestamp"]).cast(IntegerType()))
        
        
#     else:
#         print ("No files to process")
#         exit(0)
# except FileNotFoundError as e:
#     print ("No files to process")
#     print (repr(e))
# except Except as e:
#     print (repr(e))



No files to process
FileNotFoundError(2, 'No such file or directory')


In [266]:
######## Start Hadoop  #############
# $HADOOP_HOME/bin/hdfs --daemon start namenode 
# http://localhost:9870/dfshealth.html#tab-overview
# /usr/local/Cellar/hadoop/3.3.1/libexec

######## Hadoop hdfs create directory/folder  #############
# bin/hdfs dfs -mkdir /source/incremental/
# bin/hdfs dfs -mkdir /source/historic/
# bin/hdfs dfs -mkdir /source/archive/
# bin/hdfs dfs -mkdir /source/error/
# bin/hdfs dfs -mkdir /source/working/
# bin/hdfs dfs -mkdir /source/output/

######## copy data from local to hdfs  #############
# hadoop fs -put /Users/jpedapally/Documents/project/intute/source/working/call_logs_data_202112305750.json /user/hadoop/hadoopfile.csv
# bin/hdfs dfs -put /Users/jpedapally/Documents/project/intute/source/working/call_logs_data_202112305750.json /source/working/
# bin/hdfs dfs -put /Users/jpedapally/Documents/project/intute/source/historic/*.json /source/error/

######## check files exist in HDFS  #############
# bin/hdfs dfs -ls /source/


######## Stop Hadoop  #############
# $HADOOP_HOME/bin/hdfs --daemon stop namenode
# $HADOOP_HOME/bin/hdfs --daemon stop datanode
# $HADOOP_HOME/sbin/stop-dfs.sh
# $HADOOP_HOME/bin/yarn --daemon stop resourcemanager
# $HADOOP_HOME/bin/yarn --daemon stop nodemanager
# $HADOOP_HOME/sbin/stop-yarn.sh
# $HADOOP_HOME/bin/yarn stop proxyserver
# $HADOOP_HOME/bin/mapred --daemon stop historyserver

######## useful links for inital setup  #############
#https://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-common/ClusterSetup.html
# https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html#Fully-Distributed_Operation

In [None]:
###### To check how many files exists ##########
# import subprocess
# df = spark.read.option("header","true").option("delimiter",",").option("inferschems","True").format("json").load(f"{incremental_directory}*.json")
# df.printSchema()
# def run_cmd(args_list):
#         """
#         run linux commands
#         """
#         # import subprocess
#         print('Running system command: {0}'.format(' '.join(args_list)))
#         proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#         s_output, s_err = proc.communicate()
#         s_return =  proc.returncode
#         return s_return, s_output, s_err 
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-count', f'{working_directory}'])
# lines = out.split()
# print (ret)
# print(out)
# print(err)
# print("---------------")
# print(lines)

# directory = "/Volumes/GoogleDrive/My Drive/Training/Pyspark/Excersise/intute/source/incremental"
# test_file = [path for path in os.listdir(directory) if os.path.isfile(os.path.join(directory, path))]
# print (test_file)

In [None]:
############ Hive commands ############


# show databases;

# create database call_data_db;

# show databases;

# use call_data_db;


# CREATE EXTERNAL TABLE call_data_db.call_data_historic_daily 
# ( 
# Agentid                          string,          
# Attributes                       struct        
# <AgentCallbackMessage     :  string,            
#  AgentHoldLoopDuration    :  string,             
#  CallRecordingEnabled     :  string,             
#  OfferQualityFeedback     :  string,             
#  QueueDuration            :  string,             
#  QueueName                :  string,             
#  QueueOverrideEnabled     :  string,             
#  QueueType                :  string,             
#  Rating                   :  string> ,           
# CallEndTimestamp                 timestamp,       
# CallStartTimestamp               timestamp,       
# Channel                          string,          
# ContactId                        string,          
# InitialContactId                 string,          
# InitiationMethod                 string,          
# NextContactId                    string,          
# PreviousContactId                string,          
# AgentCallbackMessage             string,          
# AgentHoldLoopDuration            string,          
# CallRecordingEnabled             string,          
# OfferQualityFeedback             string,          
# QueueDuration                    string,          
# QueueName                        string,          
# QueueOverrideEnabled             string,          
# QueueType                        string,          
# Rating                           string,          
# callDuration                     integer           
# )  PARTITIONED BY (partitionByDate date)
# row format delimited fields terminated by ',' 
# stored as parquet ;


# LOAD DATA INPATH 'hdfs://localhost:9000/source/historic/' overwrite into table call_data_db.call_data_historic_daily;


In [None]:
# ##################### Interacting with Hadoop HDFS using Python codes ##################### 

# # https://community.cloudera.com/t5/Community-Articles/Interacting-with-Hadoop-HDFS-using-Python-codes/ta-p/245163




# Run Hadoop ls command in Python
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-ls', 'hdfs_file_path'])
# lines = out.split('\n')


# Run Hadoop get command in Python
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-get', 'hdfs_file_path', 'local_path'])


# Run Hadoop put command in Python
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-put', 'local_file', 'hdfs_file_path'])


# Run Hadoop copyFromLocal command in Python
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-copyFromLocal', 'local_file', 'hdfs_file_path'])
                            
# Run Hadoop copyToLocal command in Python
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-copyToLocal', 'hdfs_file_path', 'local_file'])


# hdfs dfs -rm -skipTrash /path/to/file/you/want/to/remove/permanently
# Run Hadoop remove file command in Python
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-rm', 'hdfs_file_path'])
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-rm', '-skipTrash', 'hdfs_file_path'])


# rm -r
# HDFS Command to remove the entire directory and all of its content from HDFS.
# Usage: hdfs dfs -rm -r <path>
    
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-rm', '-r', 'hdfs_file_path'])
# (ret, out, err)= run_cmd(['hdfs', 'dfs', '-rm', '-r', '-skipTrash', 'hdfs_file_path'])




# Check if a file exist in HDFS
# Usage: hadoop fs -test -[defsz] URI


# Options:


# -d: f the path is a directory, return 0.
# -e: if the path exists, return 0.
# -f: if the path is a file, return 0.
# -s: if the path is not empty, return 0.
# -z: if the file is zero length, return 0.





# Spark UI description Job,Stage,

In [None]:
# https://sparkbyexamples.com/spark/spark-web-ui-understanding/

<!-- ###### //Transformation
# df = spark.read.option("inferSchema",True).option("header",True).csv(path = "/path/for/the/file")
# //Action
# df.count() -->

-------------------- 
Number of Spark Jobs:
--------------------
The number of Spark jobs is equal to the number of actions in the application and each Spark job should have at least one Stage.
In our above application, we have performed 3 Spark jobs (0,1,2)

Job 0. read the CSV file.
Job 1. Inferschema from the file.
Job 2. Count Check



--------------------
Number of Stages:
--------------------
Each Wide Transformation results in a separate Number of Stages. 
In our case, Spark job0 and Spark job1 have individual single stages but when 
it comes to Spark job 3 we can see two stages that are because of the partition of data. 
Data is partitioned into two files by default.






