# MAP REDUCE USING PYSPARK 

Dataset: NYC 311 CALL CENTER INQUIRY DATA

Big Data Framework: Spark using Pyspark

Link to access the data: https://data.cityofnewyork.us/City-Government/311-Call-Center-Inquiry/wewp-mm3p/about_data

# INSTALL PYSPARK 

In [None]:
!pip install pyspark
!pip install findspark




# IMPORT LIBRARIES

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name
import glob
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col
import findspark
findspark.init()
import pandas as pd
import matplotlib.pyplot as plt
import pylab as pl
from pyspark.sql.functions import year,month
from pyspark.sql.types import DateType
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import split, col, concat_ws

# INITIALISE SPARK SESSION 

In [None]:
# Initialize a Spark session
spark = SparkSession.builder.appName("ReadAllFiles").config('config_option','value').getOrCreate()

In [None]:
df=spark.read.csv('311_Call_Center_Inquiry.csv',header=True)

In [None]:
df.show()

+---------+----------+-----------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|UNIQUE_ID|      DATE|       TIME|           DATE_TIME|  AGENCY|         AGENCY_NAME|        INQUIRY_NAME|   BRIEF_DESCRIPTION|     CALL_RESOLUTION|
+---------+----------+-----------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
| 79381835|11/07/2011|10:03:18 AM|11/07/2011 10:03:...|     DOB|Department of Bui...|Schedule a Plan E...|Request an appoin...|      Scheduled Appt|
| 79381836|11/07/2011| 3:51:49 PM|11/07/2011 03:51:...|   DOHMH|Department of Hea...|NYC Well - Drug a...|Information and t...|Transfer to City ...|
| 79381837|11/07/2011|10:01:11 AM|11/07/2011 10:01:...|   3-1-1|   3-1-1 Call Center|Inmate Location a...|Get information o...|   Internal Transfer|
| 79381838|11/07/2011| 6:31:44 PM|11/07/2011 06:31:...|TREASURY|Department of the...|Federal Tax Assis...|

# DATA CLEANING 

In [None]:
# Convert date string to date data type with the correct format pattern
date_format = "dd/MM/yyyy"
date= df.withColumn('DATE', F.to_date(F.col("DATE"), date_format))
date.show()

+---------+----------+-----------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|UNIQUE_ID|      DATE|       TIME|           DATE_TIME|  AGENCY|         AGENCY_NAME|        INQUIRY_NAME|   BRIEF_DESCRIPTION|     CALL_RESOLUTION|
+---------+----------+-----------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
| 79381835|2011-07-11|10:03:18 AM|11/07/2011 10:03:...|     DOB|Department of Bui...|Schedule a Plan E...|Request an appoin...|      Scheduled Appt|
| 79381836|2011-07-11| 3:51:49 PM|11/07/2011 03:51:...|   DOHMH|Department of Hea...|NYC Well - Drug a...|Information and t...|Transfer to City ...|
| 79381837|2011-07-11|10:01:11 AM|11/07/2011 10:01:...|   3-1-1|   3-1-1 Call Center|Inmate Location a...|Get information o...|   Internal Transfer|
| 79381838|2011-07-11| 6:31:44 PM|11/07/2011 06:31:...|TREASURY|Department of the...|Federal Tax Assis...|

In [None]:
df=date.withColumn('Year',year("DATE")).withColumn('Month', month("Date"))
df.show()

+---------+----------+-----------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+----+-----+
|UNIQUE_ID|      DATE|       TIME|           DATE_TIME|  AGENCY|         AGENCY_NAME|        INQUIRY_NAME|   BRIEF_DESCRIPTION|     CALL_RESOLUTION|Year|Month|
+---------+----------+-----------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+----+-----+
| 79381835|2011-07-11|10:03:18 AM|11/07/2011 10:03:...|     DOB|Department of Bui...|Schedule a Plan E...|Request an appoin...|      Scheduled Appt|2011|    7|
| 79381836|2011-07-11| 3:51:49 PM|11/07/2011 03:51:...|   DOHMH|Department of Hea...|NYC Well - Drug a...|Information and t...|Transfer to City ...|2011|    7|
| 79381837|2011-07-11|10:01:11 AM|11/07/2011 10:01:...|   3-1-1|   3-1-1 Call Center|Inmate Location a...|Get information o...|   Internal Transfer|2011|    7|
| 79381838|2011-07-11| 6:31:44 PM|11/07/

In [None]:
df.count()

92767867

# MAP REDUCE

In [None]:
# Select columns from the DataFrame
Inquiry_Agency=df.select("AGENCY")
Inquiry_Name=df.select("Inquiry_NAME")
Inquiry_Resolution=df.select("CALL_RESOLUTION")

In [None]:
#convert dataframe to RDD and perform Map Reduce
mapreduce_Agency=Inquiry_Agency.rdd.map(lambda x:(x,1))\
                                   .reduceByKey(lambda x,y:x+y)
mapreduce_Inquiry_Name=Inquiry_Name.rdd.map(lambda x:(x,1))\
                                   .reduceByKey(lambda x,y:x+y)

mapreduce_Resolution=Inquiry_Resolution.rdd.map(lambda x:(x,1))\
                                   .reduceByKey(lambda x,y:x+y)

In [None]:
#collect mapreduce_agency outcome to check Map Reduce output
mapreduce_Agency.collect()

[(Row(AGENCY='DHS'), 983079),
 (Row(AGENCY='DOI'), 35759),
 (Row(AGENCY='CHALL'), 135083),
 (Row(AGENCY='CCPC'), 36),
 (Row(AGENCY='ZTSTAGY'), 7),
 (Row(AGENCY='IA'), 67361),
 (Row(AGENCY='CWI'), 739),
 (Row(AGENCY='OCHIA'), 9),
 (Row(AGENCY='CQCAPD'), 3034),
 (Row(AGENCY='CITIFIELD'), 2391),
 (Row(AGENCY='NYSPARKS'), 1426),
 (Row(AGENCY='TSASC'), 11),
 (Row(AGENCY='ACS'), 758248),
 (Row(AGENCY='MOIA'), 130407),
 (Row(AGENCY='ABC'), 12921),
 (Row(AGENCY='MAIG'), 5),
 (Row(AGENCY='NYCOURTS'), 1194115),
 (Row(AGENCY='DOITT'), 66641),
 (Row(AGENCY='BPL'), 32714),
 (Row(AGENCY='NYPD'), 8742374),
 (Row(AGENCY='NYSDOH'), 317653),
 (Row(AGENCY='OPA'), 18041),
 (Row(AGENCY='REDCROSS'), 7436),
 (Row(AGENCY='BSA'), 889),
 (Row(AGENCY='GROWNYC'), 7683),
 (Row(AGENCY='CEO'), 659),
 (Row(AGENCY='BBB'), 61581),
 (Row(AGENCY='BPBRX'), 228),
 (Row(AGENCY='DSNY'), 5769668),
 (Row(AGENCY='NYCRGB'), 42551),
 (Row(AGENCY='MFANYC'), 18798),
 (Row(AGENCY='USMARINE'), 3769),
 (Row(AGENCY='NYSDCJS'), 3126),
 

In [None]:
#convert mapReduce outcome to a dataframe
agency_output=mapreduce_Agency.toDF(["Agency",'Number'])
Resolution_output=mapreduce_Resolution.toDF(["Resolution",'Number'])
Inquiry_Name_output=mapreduce_Inquiry_Name.toDF(["Inquiry_Name",'Number'])


In [None]:
#convert the dataframe to Pandas
agency_output=agency_output.toPandas()
Resolution_output=Resolution_output.toPandas()
Inquiry_Name_output=Inquiry_Name_output.toPandas()


In [None]:
#Check Pandas output
agency_output

Unnamed: 0,Agency,Number
0,"(DHS,)",983079
1,"(DOI,)",35759
2,"(CHALL,)",135083
3,"(CCPC,)",36
4,"(ZTSTAGY,)",7
...,...,...
219,"(TAT,)",4
220,"(NYSDOT,)",63674
221,"(AGMKT,)",23439
222,"(DVS,)",4565


In [None]:
Inquiry_Name_output .head()

Unnamed: 0,Inquiry_Name,Number
0,"(Rodent Complaint - Other Location,)",193594
1,"(Gas Appliance or Pipe Permit - Manhattan,)",890
2,"(Find a Community Board by Name,)",14153
3,"(Job Training for Public Assistance Clients,)",3604
4,"(Adult Probation Supervision - Queens,)",1624


# ENTIRE DATASET ANALYSIS

In [None]:
#Find the number of inquiries over the years
year_inquiries = df.groupBy('Year').count().orderBy('count', ascending=False)
year_inquiries = year_inquiries.toPandas()
year_inquiries = year_inquiries.sort_values('Year')

In [None]:
# Split the time column into hours and AM/PM
df = df.withColumn("time_parts", split(df["TIME"], " "))
# Extract the hour part (HH) and AM/PM part (AM/PM)
df = df.withColumn("hour", split(df["time_parts"][0], ":")[0])
df = df.withColumn("am_pm", df["time_parts"][1])
df = df.withColumn("hour_am_pm", concat_ws(" ", df["hour"], df["am_pm"]))
df.select("TIME", "hour_am_pm").show()


+-----------+----------+
|       TIME|hour_am_pm|
+-----------+----------+
|10:03:18 AM|     10 AM|
| 3:51:49 PM|      3 PM|
|10:01:11 AM|     10 AM|
| 6:31:44 PM|      6 PM|
| 8:08:27 PM|      8 PM|
|12:38:00 PM|     12 PM|
| 5:34:33 PM|      5 PM|
|11:39:12 AM|     11 AM|
| 5:14:59 PM|      5 PM|
| 9:51:01 AM|      9 AM|
| 9:28:22 AM|      9 AM|
| 9:18:06 AM|      9 AM|
| 3:42:11 PM|      3 PM|
| 9:00:59 PM|      9 PM|
|11:29:21 AM|     11 AM|
| 2:23:29 PM|      2 PM|
| 3:41:33 PM|      3 PM|
| 4:55:48 PM|      4 PM|
|11:31:58 AM|     11 AM|
|11:40:06 AM|     11 AM|
+-----------+----------+
only showing top 20 rows



In [None]:
# Group by the "hour_am_pm" column and count occurrences
grouped_df = df.groupBy("hour_am_pm").count()
time_inquiries =grouped_df.toPandas()
time_inquiries.head()

Unnamed: 0,hour_am_pm,count
0,6 PM,3664540
1,8 PM,2640865
2,7 AM,2108347
3,7 PM,3049605
4,6 AM,942193


# EXPORT CSV FILES FOR VISUALISATION ON POWER BI

In [None]:
path = 'drive/MyDrive/MIT805/Visual_data/'
agency_output.to_csv(path+"agency_output.csv", index=False)
year_inquiries.to_csv(path+"year_inquiries.csv", index=False)
Resolution_output.to_csv(path+"Resolution_output.csv", index=False)
Inquiry_Name_output.to_csv(path+"Inquiry_Name_output", index=False)
time_inquiries.to_csv(path+"Time_Inquiry.csv", index=False)
