In [1]:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

crime_df = sqlContext.read.format("com.databricks.spark.csv").option("inferschema", "true").option("header", "true").load("/FileStore/tables/crime.csv")
climate_df = sqlContext.read.format("com.databricks.spark.csv").option("inferschema", "true").option("header", "true").load("/FileStore/tables/dataset/1144508.csv")

In [2]:
len(climate_df.columns)

In [3]:
  crime_df.select("LAW_CAT_CD").distinct().show()

In [4]:
crime_df.select("OFNS_DESC").distinct().count()

In [5]:
crime_df.sort("CMPLNT_FR_DT").na.drop().show()

In [6]:
# Date conversion

from pyspark.sql.functions import udf


crime_df_date = crime_df.select("CMPLNT_FR_DT", "CMPLNT_FR_TM", "OFNS_DESC", "LAW_CAT_CD", "Latitude", "Longitude", "PD_DESC")

def crime_date_convert(date, time):
  try:
  	k = date.split("/")
	full_date = "-".join(k[2:] + k[1:2] + k[0:1])
	full_date = full_date + " " + time.split(":")[0]
	return full_date
  except:
    return "2021-01-01 00"
  


udf_crime_date_convert = udf(crime_date_convert, StringType())

with_crime_date_conversion = crime_df_date.withColumn("normalized_date", udf_crime_date_convert("CMPLNT_FR_DT", "CMPLNT_FR_TM")).select("normalized_date", "OFNS_DESC", "LAW_CAT_CD", "Latitude", "Longitude", "PD_DESC")



climate_df_date = climate_df.select("DATE", "HOURLYDRYBULBTEMPC")



In [7]:
def climate_date_convert(date):
	return date.split(":")[0]

udf_climate_date_convert = udf(climate_date_convert, StringType())

climate_with_date_conversion = climate_df_date.withColumn("normalized_date", udf_climate_date_convert("DATE")).select("normalized_date", "HOURLYDRYBULBTEMPC")

join_climate_crime = climate_with_date_conversion.join(with_crime_date_conversion, "normalized_date")

In [8]:
cleaned_join_climate_crime = join_climate_crime.na.drop()

def get_hour(date):
	return date.split(" ")[1]

def get_date(date):
	return date.split(" ")[0]


udf_get_hour = udf(get_hour, StringType())

udf_get_date = udf(get_date, StringType())


with_refined_date_columns = cleaned_join_climate_crime.withColumn("date", udf_get_date("normalized_date")).withColumn("hour", udf_get_hour("normalized_date"))

In [9]:
import requests
from pyspark.sql.types import StringType

def get_pincode(lat, lng):
  try:
	sensor = 'true'
	base = "https://maps.googleapis.com/maps/api/geocode/json?key=AIzaSyD0Hojtoe0t06aY39dcD1vV77Lsxa3PW6c&"
	params = "latlng={lat},{lon}&sensor={sen}".format(lat=lat,lon=lng,sen=sensor)
	url = "{base}{params}".format(base=base, params=params)
	response = requests.get(url).json()
	results = response["results"]
	address_components = list(map(lambda x: x["address_components"], results))
	flatten = [item for sublist in address_components for item in sublist]
	postal_objects = list(filter(lambda x: x["types"] == ["postal_code"], flatten))
	postal_codes = list(map(lambda x: x["long_name"], postal_objects))
	return str(postal_codes[0]);
  except Exception as e:
    return None


udf_getZip = udf(get_pincode, StringType())

In [10]:
with_zipCode=with_refined_date_columns.groupBy("Latitude","Longitude").count().withColumn("zipcode", udf_getZip("Latitude", "Longitude"))

In [11]:
zipcodeData= sqlContext.read.format("com.databricks.spark.csv").option("inferschema", "true").option("header", "true").load("/FileStore/tables/withZipcode.csv")

In [12]:
dataToVisualize=zipcodeData.join(with_refined_date_columns,[with_refined_date_columns.Latitude==zipcodeData.Latitude,with_refined_date_columns.Longitude==zipcodeData.Longitude]).select(
   with_refined_date_columns.Latitude,with_refined_date_columns.Longitude,"normalized_date", "OFNS_DESC", "LAW_CAT_CD", "PD_DESC",zipcodeData.zipcode,"HOURLYDRYBULBTEMPC","date","hour")

In [13]:
  df= sqlContext.read.format("com.databricks.spark.csv").option("inferschema", "true").option("header", "true").load("/FileStore/dataToVis.csv")

In [14]:
display(crimeWithWeather)


In [15]:
crimeWithWeather=df.sort("HOURLYDRYBULBTEMPC")


In [16]:
hotel=sqlContext.read.format("com.databricks.spark.csv").option("inferschema", "true").option("header", "true").load("/FileStore/tables/HotelData.csv")

In [17]:
hotelWithCrime=hotel.join(crimeWithWeather,hotel.zipCode==crimeWithWeather.zipcode).select(hotel.zipCode,"Name","LAW_CAT_CD","OFNS_DESC","normalized_date").na.drop()

In [18]:
hotel.columns

In [19]:
display(hotelWithCrime)

In [20]:
bars=sqlContext.read.format("com.databricks.spark.csv").option("inferschema", "true").option("header", "true").load("/FileStore/tables/Liquor_Authority_Quarterly_List_of_Active_Licenses.csv")


In [21]:
bars.columns

In [22]:
barsWithCrime=bars.join(crimeWithWeather,bars.Zip==crimeWithWeather.zipcode).select(bars.Zip,"Premises Name",bars.Latitude,bars.Longitude,"LAW_CAT_CD","OFNS_DESC","normalized_date").na.drop()

In [23]:
display(barsWithCrime)

In [24]:
bars=sqlContext.read.format("com.databricks.spark.csv").option("inferschema", "true").option("header", "true").load("/tables/FileStore/abc.csv")


In [25]:
display(crime_df)

In [26]:
display(climate_df)