In [1]:
# by default spark object is available
spark

In [2]:
# stopping existing running spark
spark.stop()

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.master("spark://localhost:7077").appName("ETL").getOrCreate()

url = "https://files.consumerfinance.gov/ccdb/complaints.json.zip"

from six.moves import urllib

!ls /

DATA_DIR = "/data/financial_complaint_data"

import os

os.makedirs(DATA_DIR,exist_ok = True)

!ls /data

filename = os.path.basename(url)

filename

file_path = os.path.join(DATA_DIR,filename)

file_path

urllib.request.urlretrieve(url,file_path)

EXTRACT_DIR = os.path.join(DATA_DIR,"Extracted_Dir")

EXTRACT_DIR

os.makedirs(EXTRACT_DIR)

from zipfile import ZipFile

ZipFile(file_path,"r").extractall(EXTRACT_DIR)

!ls /data/financial_complaint_data/Extracted_Dir/

json_file_path = "/data/financial_complaint_data/Extracted_Dir/complaints.json"
json_file_path

# creating dir in the Hadoop
!hdfs dfs -mkdir /financial_complaint_data

# load data from docker to Hadoop
!hdfs dfs -put /data/financial_complaint_data/Extracted_Dir/complaints.json /financial_complaint_data

In [5]:
dfcomplaints = spark.read.json('/financial_complaint_data/complaints.json')

In [6]:
dfcomplaints.show(5)

+---------------+--------------------+-----------------------+--------------------+------------+-----------------------+-------------------------+-----------------+-------------+--------------------+--------------------+--------------------+-----+--------------------+----------------+-------------+----+------+--------+
|_corrupt_record|             company|company_public_response|    company_response|complaint_id|complaint_what_happened|consumer_consent_provided|consumer_disputed|date_received|date_sent_to_company|               issue|             product|state|           sub_issue|     sub_product|submitted_via|tags|timely|zip_code|
+---------------+--------------------+-----------------------+--------------------+------------+-----------------------+-------------------------+-----------------+-------------+--------------------+--------------------+--------------------+-----+--------------------+----------------+-------------+----+------+--------+
|              [|                null

In [7]:
dfcomplaints.createTempView('complaints')

In [12]:
dfcomplaints.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- company: string (nullable = true)
 |-- company_public_response: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_id: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- date_received: string (nullable = true)
 |-- date_sent_to_company: string (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- sub_issue: string (nullable = true)
 |-- sub_product: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [14]:
spark.sql('select count(distinct complaint_id) from complaints').show(5)

+----------------------------+
|count(DISTINCT complaint_id)|
+----------------------------+
|                     2794502|
+----------------------------+



In [20]:
df_complaints_per_state = spark.sql('select state,count(distinct complaint_id) as no_of_complaints from complaints group by state order by no_of_complaints desc limit 5')

In [21]:
type(df_complaints_per_state)

pyspark.sql.dataframe.DataFrame

In [24]:
# convert to pandas dataframe so we can use all pandas functionality of indexing...etc
df_complaints_per_state = df_complaints_per_state.toPandas()

In [25]:
df_complaints_per_state

Unnamed: 0,state,no_of_complaints
0,CA,347894
1,FL,311341
2,TX,269759
3,NY,184512
4,GA,169067


In [27]:
df_complaints_per_state.loc[:2]

Unnamed: 0,state,no_of_complaints
0,CA,347894
1,FL,311341
2,TX,269759


In [28]:
df_complaints_per_state[:2]

Unnamed: 0,state,no_of_complaints
0,CA,347894
1,FL,311341
