In [1]:
import findspark
findspark.init('pathtofindspark)

In [2]:
# import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
sparkLab = SparkSession.builder.getOrCreate()

In [5]:
fileName = str(input("Enter the path to the csv file including the filename: "))

Enter the path to the csv file including the filename: SkylineLab.csv


In [6]:
def readFile(fileName):
    if not ".csv" in fileName:
        fileName += ".csv"
    return fileName
    

In [7]:
df = sparkLab.read.csv(fileName, header=True, inferSchema=True)

In [8]:
df.show()

+--------+-----------+--------------------+--------------------+
|    Date|       Time|      Street_Address|          City_State|
+--------+-----------+--------------------+--------------------+
|7/1/2014| 8:27:00 PM|    622 THIRD AV ...|     M           ...|
|7/1/2014| 9:04:00 PM|     E 77TH ST   ...|     M           ...|
|7/1/2014|10:20:00 PM|    67 WEST PALIS...|    PALISADES PAR...|
|7/1/2014|12:28:00 PM|    130 MIDDLE NE...|    SANDS POINT L...|
|7/1/2014| 4:45:00 PM|    36 E 31ST ST ...|     M           ...|
|7/1/2014| 6:15:00 PM|    120 E 16TH ST...|     M           ...|
|7/1/2014| 8:55:00 PM|    300 FLATBUSH ...|     BK          ...|
|7/1/2014|10:00:00 PM|    410 PARK AV  ...|     M           ...|
|7/1/2014| 6:00:00 AM|    5 E 22ND ST  ...|     M           ...|
|7/1/2014| 5:21:00 AM|    2249 MORRIS A...|     BX          ...|
|7/1/2014| 6:15:00 AM|    1536 CROSBY A...|     BX          ...|
|7/1/2014| 8:40:00 AM|    3260 PERRY AV...|     BX          ...|
|7/1/2014|10:40:00 AM|   

In [9]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [10]:
def standardizeDate(df):  #Change the format of the date
    df = df.withColumn("Date", F.regexp_replace(F.col('Date'), "/", "-"))
    return df
            
df = standardizeDate(df)    

In [11]:
 #If the date and time exist separately, join them in one column
def checkIfDateExistsSeparately(df):   
    if 'Date' in df.columns:
        if 'Time' in df.columns:
            df = df.withColumn('Datetime', F.concat(F.col('Date'), F.lit(' '), F.col('Time')))
            columsToDrop = ['Date', 'Time']
            df = df.drop(*columsToDrop)
    else:
        print("The columns date and time don't exist separately")
        return df
    return df
df = checkIfDateExistsSeparately(df)

In [12]:
def removeTrailingAndLeadingSpaces(df):
    for x in df.columns:
         df = df.withColumn(x, F.trim(x))
    return df
df = removeTrailingAndLeadingSpaces(df)   

In [13]:
def createColumnsAndSelect(df):
    df = df.withColumn("Source", F.lit("SKYLINE"))
    df = df.withColumn("City", F.lit(" "))
    df = df.withColumn("State", F.lit(" "))
    df = df.select(['Datetime', 'Street_Address', 'City', 'City_State', 'State', 'Source'])
    return df
df = createColumnsAndSelect(df)

In [14]:
df.show()

+--------------------+--------------------+----+-----------------+-----+-------+
|            Datetime|      Street_Address|City|       City_State|State| Source|
+--------------------+--------------------+----+-----------------+-----+-------+
| 7-1-2014 8:27:00 PM|        622 THIRD AV|    |                M|     |SKYLINE|
| 7-1-2014 9:04:00 PM|           E 77TH ST|    |                M|     |SKYLINE|
|7-1-2014 10:20:00 PM|67 WEST PALISADES...|    |PALISADES PARK NJ|     |SKYLINE|
|7-1-2014 12:28:00 PM|  130 MIDDLE NECK RD|    |   SANDS POINT LI|     |SKYLINE|
| 7-1-2014 4:45:00 PM|        36 E 31ST ST|    |                M|     |SKYLINE|
| 7-1-2014 6:15:00 PM|       120 E 16TH ST|    |                M|     |SKYLINE|
| 7-1-2014 8:55:00 PM|     300 FLATBUSH AV|    |               BK|     |SKYLINE|
|7-1-2014 10:00:00 PM|         410 PARK AV|    |                M|     |SKYLINE|
| 7-1-2014 6:00:00 AM|         5 E 22ND ST|    |                M|     |SKYLINE|
| 7-1-2014 5:21:00 AM|      

In [15]:
def fixStreetAbbv(df):
    df = df.withColumn("Street_Address", F.regexp_replace(F.col('Street_Address'), " BL$", " BLVD"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " AV$", " AVE"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " AVENUE$", " AVE"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " COURT$", " CT"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " STREET$", " ST"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " LANE$", " LN"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " ROAD$", " RD"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " DRIVE$", " DR"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " HY$", " HWY"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " HY$", " HWY"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " PZ$", " PLZ"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), " TP$", " TPKE"))
    
    return df
df = fixStreetAbbv(df)

In [16]:
def fixTheNonNYAddress(a, b):
    if b[-2:] == 'NJ' or b[-2:] == 'CT' or b[-2:] == 'LI' or b[-2:] == 'PA':
        a =  a + " " +b[:-2]
        
    return a
ny_udf = F.udf(fixTheNonNYAddress, T.StringType())
df = df.withColumn('Street_Address',ny_udf('Street_Address', 'City_State'))

In [17]:
def separateState(df):
    df = df.withColumn('City', F.substring(F.col('City_State'), -2, 2))
    return df
df = separateState(df)

In [18]:
df = removeTrailingAndLeadingSpaces(df)

In [19]:
def defineState(df):
    df = df.withColumn('State', F.when((F.col('City') == ('M')) | (F.col('City') == ('QU')) | (F.col('City') == ('BX')) | (F.col('City') == ('CA')) | (F.col('City') == ('ST')) | (F.col('City') == ('WE')) | (F.col('City') == ('BK')) | (F.col('City') == ('FK')) | (F.col('City') == ('AG')), 'NEW YORK').otherwise(F.col('State')))
    df = df.withColumn('State', F.when(F.col('City') == ('PA'), 'PENNSYLVANIA').otherwise(F.col('State')))
    df = df.withColumn('State', F.when(F.col('City') == ('CT'), 'CONNECTICUT').otherwise(F.col('State')))
    df = df.withColumn('State', F.when(F.col('City') == ('LI'), 'LONG ISLAND').otherwise(F.col('State')))
    df = df.withColumn('State', F.when((F.col('City') == ('NJ')) | (F.col('City') == ('WK')), 'NEW JERSEY').otherwise(F.col('State')))
    
    return df
df = defineState(df)

In [20]:
def cityName(df):
    df = df.withColumn('City', F.when(F.col('City') == 'NJ', F.regexp_replace(F.col('City'), "NJ", "")).otherwise(F.col('City')))
    df = df.withColumn('City', F.when(F.col('City') == 'LI', F.regexp_replace(F.col('City'), "LI", "")).otherwise(F.col('City')))
    df = df.withColumn('City', F.when(F.col('City') == 'PA', F.regexp_replace(F.col('City'), "PA", "")).otherwise(F.col('City')))
    df = df.withColumn('City', F.when(F.col('City') == 'CT', F.regexp_replace(F.col('City'), "CT", "")).otherwise(F.col('City')))
    df = df.withColumn('City', F.when(F.col('City') == 'WE', F.regexp_replace(F.col('City'), "WE", "")).otherwise(F.col('City')))
    df = df.withColumn('City', F.regexp_replace(F.col('City'), "M", "MANHATTAN"))
    df = df.withColumn('City', F.regexp_replace(F.col('City'), "BK", "BROOKLYN"))
    df = df.withColumn('City', F.regexp_replace(F.col('City'), "BX", "BRONX"))
    df = df.withColumn('City', F.regexp_replace(F.col('City'), "QU", "QUEENS"))
#     df = df.withColumn('City', F.regexp_replace(F.col('City'), "NJ", ""))
    
    
    return df
df = cityName(df)

In [21]:
def fixTheAirportAddress(df):
    df = df.withColumn('Street_Address', F.when(F.col('City_State') == 'NWK', '3 BREWSTER').otherwise(F.col('Street_Address')))
    df = df.withColumn('Street_Address', F.when(F.col('City_State') == 'JFK', '148-18 134TH ST').otherwise(F.col('Street_Address')))
    df = df.withColumn('Street_Address', F.when(F.col('City_State') == 'LAG', '102-05 DITMARS BLVD').otherwise(F.col('Street_Address')))
    df = df.withColumn('City_State', F.regexp_replace(F.col('City_State'), "NWK", "NEWARK"))
    
    df = df.withColumn('City_State', F.regexp_replace(F.col('City_State'), "JFK", " JAMAICA"))
  
    df = df.withColumn('City_State', F.regexp_replace(F.col('City_State'), "LAG", " EAST ELMHURST"))
    
    df = df.withColumn('City', F.regexp_replace(F.col('City'), "FK", "JAMAICA"))
    df = df.withColumn('City', F.regexp_replace(F.col('City'), "WK", "NEWARK"))
    df = df.withColumn('City', F.regexp_replace(F.col('City'), "AG", "EAST ELMHURST"))  
    return df
df = fixTheAirportAddress(df)

In [22]:
def fixTheCityWithNonNYState(df):
    df = df.withColumn('City', F.when(F.col('State') == 'NEW JERSEY', '').otherwise(F.col('City')))
    df = df.withColumn('City', F.when(F.col('State') == 'CONNECTICUT', '').otherwise(F.col('City')))
    df = df.withColumn('City', F.when(F.col('State') == 'PENNSYLVANIA', '').otherwise(F.col('City')))
    df = df.withColumn('City', F.when(F.col('State') == 'LONG ISLAND', '').otherwise(F.col('City')))
    
    return df

In [23]:
df = removeTrailingAndLeadingSpaces(df)

In [24]:
df = fixTheCityWithNonNYState(df)

In [25]:
df = df.select(['Datetime', 'Street_Address', 'City', 'State', 'Source'])

In [26]:
df.show()

+--------------------+--------------------+---------+-----------+-------+
|            Datetime|      Street_Address|     City|      State| Source|
+--------------------+--------------------+---------+-----------+-------+
| 7-1-2014 8:27:00 PM|       622 THIRD AVE|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 9:04:00 PM|           E 77TH ST|MANHATTAN|   NEW YORK|SKYLINE|
|7-1-2014 10:20:00 PM|67 WEST PALISADES...|         | NEW JERSEY|SKYLINE|
|7-1-2014 12:28:00 PM|130 MIDDLE NECK R...|         |LONG ISLAND|SKYLINE|
| 7-1-2014 4:45:00 PM|        36 E 31ST ST|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 6:15:00 PM|       120 E 16TH ST|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 8:55:00 PM|    300 FLATBUSH AVE| BROOKLYN|   NEW YORK|SKYLINE|
|7-1-2014 10:00:00 PM|        410 PARK AVE|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 6:00:00 AM|         5 E 22ND ST|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 5:21:00 AM|     2249 MORRIS AVE|    BRONX|   NEW YORK|SKYLINE|
| 7-1-2014 6:15:00 AM|     1536 CROSBY

In [27]:
def fixStreetName(df):
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "FIRST", "1ST"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "SECOND", "2ND"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "THIRD", "3RD"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "FIFTH", "5TH"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "SIXTH", "6TH"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "SEVENTH", "7TH"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "EIGHTH", "8TH"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "NINTH", "9TH"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "TENTH", "10TH"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "ELEVENTH", "11TH"))
    df = df.withColumn('Street_Address', F.regexp_replace(F.col('Street_Address'), "TWELFTH", "12TH"))

    return df

In [28]:
df = fixStreetName(df)

In [29]:
def capitalizeColulmns(df):
    for col in df.columns:
        df = df.withColumnRenamed(col, col.upper())
    return df

df = capitalizeColulmns(df)

In [30]:
df = removeTrailingAndLeadingSpaces(df)

In [31]:
df.show()

+--------------------+--------------------+---------+-----------+-------+
|            DATETIME|      STREET_ADDRESS|     CITY|      STATE| SOURCE|
+--------------------+--------------------+---------+-----------+-------+
| 7-1-2014 8:27:00 PM|         622 3RD AVE|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 9:04:00 PM|           E 77TH ST|MANHATTAN|   NEW YORK|SKYLINE|
|7-1-2014 10:20:00 PM|67 WEST PALISADES...|         | NEW JERSEY|SKYLINE|
|7-1-2014 12:28:00 PM|130 MIDDLE NECK R...|         |LONG ISLAND|SKYLINE|
| 7-1-2014 4:45:00 PM|        36 E 31ST ST|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 6:15:00 PM|       120 E 16TH ST|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 8:55:00 PM|    300 FLATBUSH AVE| BROOKLYN|   NEW YORK|SKYLINE|
|7-1-2014 10:00:00 PM|        410 PARK AVE|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 6:00:00 AM|         5 E 22ND ST|MANHATTAN|   NEW YORK|SKYLINE|
| 7-1-2014 5:21:00 AM|     2249 MORRIS AVE|    BRONX|   NEW YORK|SKYLINE|
| 7-1-2014 6:15:00 AM|     1536 CROSBY

In [None]:
df.limit(100).write.csv("standardizedCSVSkyline.csv")

In [None]:
# df.coalesce(1).limit(100).write.save(path='dial7_complete.csv', format='csv', mode='append')