In [289]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

In [290]:
spark=SparkSession.builder.appName('Analyse').enableHiveSupport().getOrCreate()

25/03/15 00:34:53 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [291]:
spark

In [292]:
df_pyspark=spark.read.option('header','true').csv('big4_financial_risk_compliance.csv')

In [293]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [294]:
df_pyspark

DataFrame[Year: string, Firm_Name: string, Total_Audit_Engagements: string, High_Risk_Cases: string, Compliance_Violations: string, Fraud_Cases_Detected: string, Industry_Affected: string, Total_Revenue_Impact: string, AI_Used_for_Auditing: string, Employee_Workload: string, Audit_Effectiveness_Score: string, Client_Satisfaction_Score: string]

In [295]:
df_pyspark.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Firm_Name: string (nullable = true)
 |-- Total_Audit_Engagements: string (nullable = true)
 |-- High_Risk_Cases: string (nullable = true)
 |-- Compliance_Violations: string (nullable = true)
 |-- Fraud_Cases_Detected: string (nullable = true)
 |-- Industry_Affected: string (nullable = true)
 |-- Total_Revenue_Impact: string (nullable = true)
 |-- AI_Used_for_Auditing: string (nullable = true)
 |-- Employee_Workload: string (nullable = true)
 |-- Audit_Effectiveness_Score: string (nullable = true)
 |-- Client_Satisfaction_Score: string (nullable = true)



In [296]:
df_pyspark.show(3,truncate=False)

+----+---------+-----------------------+---------------+---------------------+--------------------+-----------------+--------------------+--------------------+-----------------+-------------------------+-------------------------+
|Year|Firm_Name|Total_Audit_Engagements|High_Risk_Cases|Compliance_Violations|Fraud_Cases_Detected|Industry_Affected|Total_Revenue_Impact|AI_Used_for_Auditing|Employee_Workload|Audit_Effectiveness_Score|Client_Satisfaction_Score|
+----+---------+-----------------------+---------------+---------------------+--------------------+-----------------+--------------------+--------------------+-----------------+-------------------------+-------------------------+
|2020|PwC      |2829                   |51             |123                  |39                  |Healthcare       |114.24              |No                  |57               |5.8                      |8.4                      |
|2022|Deloitte |3589                   |185            |30                   |60

In [297]:
df_pyspark.columns

['Year',
 'Firm_Name',
 'Total_Audit_Engagements',
 'High_Risk_Cases',
 'Compliance_Violations',
 'Fraud_Cases_Detected',
 'Industry_Affected',
 'Total_Revenue_Impact',
 'AI_Used_for_Auditing',
 'Employee_Workload',
 'Audit_Effectiveness_Score',
 'Client_Satisfaction_Score']

In [298]:
df_pyspark.dtypes

[('Year', 'string'),
 ('Firm_Name', 'string'),
 ('Total_Audit_Engagements', 'string'),
 ('High_Risk_Cases', 'string'),
 ('Compliance_Violations', 'string'),
 ('Fraud_Cases_Detected', 'string'),
 ('Industry_Affected', 'string'),
 ('Total_Revenue_Impact', 'string'),
 ('AI_Used_for_Auditing', 'string'),
 ('Employee_Workload', 'string'),
 ('Audit_Effectiveness_Score', 'string'),
 ('Client_Satisfaction_Score', 'string')]

In [299]:
df_pyspark=df_pyspark.drop(*['Industry_Affected','Total_Revenue_Impact','AI_Used_for_Auditing','Employee_Workload','Audit_Effectiveness_Score','Client_Satisfaction_Score'])

In [300]:
df_pyspark.show()

+----+-------------+-----------------------+---------------+---------------------+--------------------+
|Year|    Firm_Name|Total_Audit_Engagements|High_Risk_Cases|Compliance_Violations|Fraud_Cases_Detected|
+----+-------------+-----------------------+---------------+---------------------+--------------------+
|2020|          PwC|                   2829|             51|                  123|                  39|
|2022|     Deloitte|                   3589|            185|                   30|                  60|
|2020|          PwC|                   2438|            212|                  124|                  97|
|2021|          PwC|                   2646|            397|                   55|                  97|
|2020|          PwC|                   2680|            216|                   99|                  46|
|2023|     Deloitte|                    818|            448|                   10|                  30|
|2022|     Deloitte|                   1199|            148|    

In [301]:
df_pyspark=df_pyspark.na.drop(how='any', subset=['Firm_Name']) #Delete any ligne where firmName is null

In [302]:
df_pyspark.show()

+----+-------------+-----------------------+---------------+---------------------+--------------------+
|Year|    Firm_Name|Total_Audit_Engagements|High_Risk_Cases|Compliance_Violations|Fraud_Cases_Detected|
+----+-------------+-----------------------+---------------+---------------------+--------------------+
|2020|          PwC|                   2829|             51|                  123|                  39|
|2022|     Deloitte|                   3589|            185|                   30|                  60|
|2020|          PwC|                   2438|            212|                  124|                  97|
|2021|          PwC|                   2646|            397|                   55|                  97|
|2020|          PwC|                   2680|            216|                   99|                  46|
|2023|     Deloitte|                    818|            448|                   10|                  30|
|2022|     Deloitte|                   1199|            148|    

# 1. Fraud_Cases_Detected 

In [303]:
type(df_pyspark['Fraud_Cases_Detected'])

pyspark.sql.column.Column

In [304]:
df_pyspark_Fraud_Cases_Detected = df_pyspark.withColumn('Fraud_Cases_Detected', col('Fraud_Cases_Detected').cast(IntegerType()))

In [305]:
df_pyspark_Fraud_Cases_Detected=df_pyspark_Fraud_Cases_Detected.groupBy('Firm_Name').sum('Fraud_Cases_Detected')

In [306]:
df_pyspark_Fraud_Cases_Detected.show()

+-------------+-------------------------+
|    Firm_Name|sum(Fraud_Cases_Detected)|
+-------------+-------------------------+
|     Deloitte|                     1481|
|         KPMG|                     1319|
|Ernst & Young|                     1031|
|          PwC|                     1439|
+-------------+-------------------------+



In [311]:
# Save the DataFrame to Hive (if the table doesn't exist, it will be created)
df_pyspark_Fraud_Cases_Detected.write.saveAsTable("Fraud_Cases_Detected")

In [317]:
# Verify by running a Hive SQL query
spark.sql("SELECT * FROM Fraud_Cases_Detected").show()

+-------------+-------------------------+
|    Firm_Name|sum(Fraud_Cases_Detected)|
+-------------+-------------------------+
|     Deloitte|                     1481|
|         KPMG|                     1319|
|Ernst & Young|                     1031|
|          PwC|                     1439|
+-------------+-------------------------+



# 2. Compliance_Violations

In [307]:
type(df_pyspark['Compliance_Violations'])

pyspark.sql.column.Column

In [312]:
df_pyspark_Compliance_Violations = df_pyspark.withColumn('Compliance_Violations', col('Compliance_Violations').cast(IntegerType()))

In [314]:
df_pyspark_Compliance_Violations=df_pyspark_Compliance_Violations.groupBy('Firm_Name').sum('Compliance_Violations')

In [310]:
df_pyspark_Compliance_Violations.show()

+-------------+--------------------------+
|    Firm_Name|sum(Compliance_Violations)|
+-------------+--------------------------+
|     Deloitte|                      3127|
|         KPMG|                      2070|
|Ernst & Young|                      2724|
|          PwC|                      2627|
+-------------+--------------------------+



In [315]:
# Save the DataFrame to Hive (if the table doesn't exist, it will be created)
df_pyspark_Compliance_Violations.write.saveAsTable("Compliance_Violations")

In [316]:
# Verify by running a Hive SQL query
spark.sql("SELECT * FROM Compliance_Violations").show()

+-------------+--------------------------+
|    Firm_Name|sum(Compliance_Violations)|
+-------------+--------------------------+
|     Deloitte|                      3127|
|         KPMG|                      2070|
|Ernst & Young|                      2724|
|          PwC|                      2627|
+-------------+--------------------------+

