<h2>ClearScore Pyspark Project</h2>

This 4 hour project is designed to process and analyze Clear Score data using Apache Spark. 
It performs various data transformation and aggregation tasks to provide insights into credit scores, employment status and bank data for users. 

NOTE: Change path values to where report folder was extracted to locally BEFORE running.

<h3>Future Consideration</h3>

<h4>Scalability/Production Approach</h4>
The use of Pyspark allows for this project to scale to big data processing.
<br>Pyspark can be run as part of an ETL pipeline in AWS or Databricks.
<br>For data exploration purposes, notebooks can also be used with Databricks.

<h4>Run Convenience</h4>
For ease of running this project, jupyter notebooks was used.
<br>This allows quick viewing of dataframes.

<h4>Test Quality and Coverage</h4>
PyTest cases have been included in code_test.py
<br>If more time allowed, 80% test coverage is the standard to implement.

<h4>Documentation</h4>
Doc strings and comments are included throughout the code.
<br>There is a readme describing code versions included.
<br>Later improvements could include the use of automated doc tools to pull the docstrings for more verbose documentation.
<br>Troubleshooting would have be included if more time was available.


In [8]:
#Imports used
import pytest
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import nest_asyncio
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
import json
import clearscore_calculations

#PLEASE CHANGE THESE PATHS TO WHERE FOLDERS ARE EXTRACTED LOCALLY!
reports_path = 'E:/Technical Tests/ClearScore/bulk-reports/reports'
accounts_path = 'E:/Technical Tests/ClearScore/bulk-reports/accounts'

In [9]:
#nest_asyncio allows pyspark to run error-free in jupyter notebook
nest_asyncio.apply()

In [10]:
#Run pyspark tests to ensure correct functionality before processing reports
!pytest code_tests.py

platform win32 -- Python 3.9.1, pytest-7.4.2, pluggy-1.3.0
rootdir: C:\Users\hayle\AppData\Local\Programs\Python\Python39\Scripts\test
plugins: anyio-2.2.0
collected 3 items

code_tests.py ...                                                        [100%]

SUCCESS: The process with PID 32508 (child process of PID 25084) has been terminated.
SUCCESS: The process with PID 25084 (child process of PID 41648) has been terminated.
SUCCESS: The process with PID 41648 (child process of PID 1200) has been terminated.


In [11]:
#Load in json datasets as pyspark dataframes
df_reports = clearscore_calculations.create_spark_df(reports_path)
df_accounts = clearscore_calculations.create_spark_df(accounts_path)
#Remove unnecessary columns to keep dataset as small as poss
df_accounts = df_accounts.drop(*['marketId', 'schemaVersion', 'deletedTimestamp', 'updatedTimestamp']).withColumnRenamed('uuid', 'user-uuid')
df_reports = df_reports.drop(*['bureau-id', 'client-ref'])
#Show sample of data
df_reports.show()
df_accounts.show()

+----------+-------------------+--------------------+---------+--------------------+
|account-id|   pulled-timestamp|              report|report-id|           user-uuid|
+----------+-------------------+--------------------+---------+--------------------+
|    655300|2019-01-15T03:48:53|{{{[{94d9a04c41d3...| 44084034|8a7cd396-4de6-4ca...|
|    655300|2018-12-15T00:43:44|{{{[{94d9a04c41d3...| 43405579|8a7cd396-4de6-4ca...|
|    655300|2018-11-14T00:54:56|{{{[{94d9a04c41d3...| 42760695|8a7cd396-4de6-4ca...|
|     31300|2018-07-10T13:43:24|{{{[{1aa1a3d5e8ad...| 40534392|84b111a6-2747-441...|
|     31300|2018-05-10T00:34:39|{{{[{1aa1a3d5e8ad...| 39565862|84b111a6-2747-441...|
|     31300|2018-06-10T00:46:43|{{{[{1aa1a3d5e8ad...| 40032979|84b111a6-2747-441...|
|     31300|2018-04-09T00:22:16|{{{[{1aa1a3d5e8ad...| 39146400|84b111a6-2747-441...|
|     31300|2018-12-10T00:01:04|{{{[{1aa1a3d5e8ad...| 43308952|84b111a6-2747-441...|
|    655300|2018-05-28T09:28:20|{{{[{552525fc2deb...| 39901445|8a

<h3>Average Credit Score (from all reports)</h3>

In [12]:
#Pull scores from user reports and calculate average
df_scores = clearscore_calculations.get_scores(df_reports)
average_score = round(df_scores.select(F.avg('score')).first()[0])
print(f'The average credit score is {average_score}')

The average credit score is 532


<h3>Number of Users Under Each Employment Status</h3>

In [13]:
#Group users by employment status and cache for quicker loading in functions below
df_accounts_employment = clearscore_calculations.get_employment_status(df_accounts)
df_accounts_employment.cache()
employment_status_count = df_accounts_employment.groupBy('employment_status').count()
employment_status_count.show()

+-----------------+-----+
|employment_status|count|
+-----------------+-----+
|      FT_EMPLOYED|   42|
|      PT_EMPLOYED|    2|
|       UNEMPLOYED|    3|
|    SELF_EMPLOYED|    6|
|     WORK_AT_HOME|    1|
|          STUDENT|    1|
|             null|    4|
+-----------------+-----+



<h3>Count of Users In Credit Score Ranges</h3>

In [14]:
#Get most recent reports for each user and count number of users in credit ranges
#Cache for quicker reload later
most_recent_scores_df = clearscore_calculations.get_most_recent_reports(df_scores)
most_recent_scores_df.cache()
num_users_score_range = clearscore_calculations.get_score_range_count(most_recent_scores_df)
num_users_score_range.show()

+----+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|0-50|51-100|101-150|151-200|201-250|251-300|301-350|351-400|401-450|451-500|501-550|551-600|601-650|651-700|
+----+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   4|     0|      0|      0|      0|      0|      0|      0|      0|      1|      7|     10|      5|     12|
+----+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+



<h3>Enriched Bank Data</h3>

In [15]:
#Pull bank data and combine datasets
enriched_df = clearscore_calculations.create_enriched_df(most_recent_scores_df, df_accounts_employment)
enriched_df.show()

+--------------------+-------------------+------------------------------+-------------------------+-----------------+-------------------+
|           user-uuid|   pulled-timestamp|number_of_active_bank_accounts|total_outstanding_balance|employment_status|          bank_name|
+--------------------+-------------------+------------------------------+-------------------------+-----------------+-------------------+
|979dfe0c-5374-48e...|2019-01-23T16:01:46|                             3|                   984212|      FT_EMPLOYED|      STANDARD_BANK|
|ab5a2fe6-f2c9-42a...|2018-12-20T08:35:09|                             0|                        0|      FT_EMPLOYED|FIRST_NATIONAL_BANK|
|bfcc1755-060b-49f...|2019-01-17T01:34:29|                             0|                        0|      FT_EMPLOYED|            NEDBANK|
|405e0215-a0db-442...|2019-01-14T02:48:31|                             2|                    92562|      FT_EMPLOYED|               ABSA|
|4d14f02f-836e-4fa...|2018-12-29T0

In [16]:
#Output as CSVs in temp folder
temp_path = clearscore_calculations.create_temp_folder()
clearscore_calculations.df_to_csv(num_users_score_range, 'Score_Range_Count', temp_path)
clearscore_calculations.df_to_csv(employment_status_count, 'Employment_Status_Count', temp_path)
clearscore_calculations.df_to_csv(enriched_df, 'Enriched_Bank_Data', temp_path)
clearscore_calculations.stop_spark()

File has been written to "C:\Users\hayle\AppData\Local\Temp\tmpegh5l_to"
File has been written to "C:\Users\hayle\AppData\Local\Temp\tmpegh5l_to"
File has been written to "C:\Users\hayle\AppData\Local\Temp\tmpegh5l_to"
