In [2]:
spark.stop()

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.master("spark://localhost:7077").appName("ETL").getOrCreate()

In [6]:
url="https://files.consumerfinance.gov/ccdb/complaints.json.zip"

In [7]:
from six.moves import urllib

In [10]:
DATA_DIR = "/data/finance_complaint_data"

In [11]:
import os

In [12]:
os.makedirs(DATA_DIR,exist_ok=True)

In [14]:
file_name = os.path.basename(url)

In [15]:
file_name

'complaints.json.zip'

In [16]:
file_path = os.path.join(DATA_DIR,file_name)

In [17]:
file_path


'/data/finance_complaint_data/complaints.json.zip'

In [18]:
urllib.request.urlretrieve(url,file_path )

('/data/finance_complaint_data/complaints.json.zip',
 <http.client.HTTPMessage at 0x7f0d9f3c1bd0>)

In [19]:
EXTRACT_DIR = os.path.join(DATA_DIR, "extracted_data")

In [20]:
EXTRACT_DIR

'/data/finance_complaint_data/extracted_data'

In [22]:
os.makedirs(EXTRACT_DIR,exist_ok=True)

In [23]:
from zipfile import ZipFile

In [24]:
with ZipFile(file_path,"r") as zip_file:
    zip_file.extractall(EXTRACT_DIR)

In [25]:
!ls /data/finance_complaint_data/extracted_data

complaints.json


In [26]:
json_file_path = "file:///data/finance_complaint_data/extracted_data/complaints.json"

In [27]:
!hdfs dfs -mkdir /finance_complaint_data

In [28]:
!hdfs dfs -put /data/finance_complaint_data/extracted_data/complaints.json  /finance_complaint_data

2022-06-26 22:04:56,310 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2022-06-26 22:05:11,131 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2022-06-26 22:05:26,206 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2022-06-26 22:05:41,856 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2022-06-26 22:05:57,484 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2022-06-26 22:06:17,023 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2022-06-26 22:06:32,130 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2022-06-26 22:06:46,137 INF

In [29]:
df=spark.read.json("/finance_complaint_data/complaints.json")

In [30]:
df.show(3)

+---------------+--------------------+-----------------------+--------------------+------------+-----------------------+-------------------------+-----------------+-------------+--------------------+--------------------+--------------------+-----+--------------------+----------------+-------------+----+------+--------+
|_corrupt_record|             company|company_public_response|    company_response|complaint_id|complaint_what_happened|consumer_consent_provided|consumer_disputed|date_received|date_sent_to_company|               issue|             product|state|           sub_issue|     sub_product|submitted_via|tags|timely|zip_code|
+---------------+--------------------+-----------------------+--------------------+------------+-----------------------+-------------------------+-----------------+-------------+--------------------+--------------------+--------------------+-----+--------------------+----------------+-------------+----+------+--------+
|              [|                null

In [37]:
df.createTempView("complaint")

AnalysisException: Temporary view 'complaint' already exists

In [33]:
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- company: string (nullable = true)
 |-- company_public_response: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_id: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- date_received: string (nullable = true)
 |-- date_sent_to_company: string (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- sub_issue: string (nullable = true)
 |-- sub_product: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [40]:
spark.sql("select company,company_response from complaint").show(5)

+--------------------+--------------------+
|             company|    company_response|
+--------------------+--------------------+
|                null|                null|
|       EQUIFAX, INC.|         In progress|
|Experian Informat...|Closed with non-m...|
|TRANSUNION INTERM...|         In progress|
|TRANSUNION INTERM...|         In progress|
+--------------------+--------------------+
only showing top 5 rows



## How many complaint has been registered for every product

In [42]:
spark.sql("select count(distinct(product)) no_of_product from complaint").show(1)


+-------------+
|no_of_product|
+-------------+
|           18|
+-------------+



In [43]:
spark.sql("select count(*) no_of_product from complaint").show(1)


+-------------+
|no_of_product|
+-------------+
|      2751348|
+-------------+



In [48]:
df=spark.sql("select product,count(complaint_id) no_of_complaint from complaint group by product")


In [50]:
df_pandas = df.toPandas()

In [51]:
df_pandas

Unnamed: 0,product,no_of_complaint
0,Debt collection,429501
1,,0
2,Payday loan,5543
3,Money transfers,5354
4,"Money transfer, virtual currency, or money ser...",41728
5,Checking or savings account,125928
6,Mortgage,356247
7,Prepaid card,3819
8,Credit card or prepaid card,147774
9,Credit reporting,140430


In [54]:
df=spark.sql("select count(distinct(state)) no_of_complaint from complaint")


In [55]:
df.show()

+---------------+
|no_of_complaint|
+---------------+
|             64|
+---------------+



In [63]:
df=spark.sql("select * from (select state,count(complaint_id) no_of_complaint from complaint group by state)state_info  order by no_of_complaint desc limit 5")


In [62]:
df.show()

+-----+---------------+
|state|no_of_complaint|
+-----+---------------+
|   CA|         343563|
|   FL|         306011|
|   TX|         264887|
|   NY|         181990|
|   GA|         165576|
|   PA|         120954|
|   IL|         102131|
|   NJ|          95579|
|   NC|          84057|
|   OH|          74884|
|   MD|          73548|
|   VA|          70664|
|   MI|          60125|
|   TN|          53785|
|   AZ|          50219|
|   AL|          48972|
|   SC|          47211|
|   MA|          45334|
|   LA|          43031|
|   WA|          40843|
+-----+---------------+
only showing top 20 rows

