# Initial PySpark Implementation

### Imports

In [55]:
import py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use("ggplot")
%matplotlib inline
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.model_selection import train_test_split
from haversine import haversine
from itertools import izip
import pyspark as ps
from pyspark.sql.types import *
from pyspark.sql.types import StructType
from pyspark.sql.functions import struct, udf, col, monotonically_increasing_id



### Spark Session and Initial Variables

In [56]:
spark = ps.sql.SparkSession\
            .builder\
            .master("local[4]")\
            .appName("Spark_EDA")\
            .getOrCreate()
            
sc = spark.sparkContext

In [57]:
schema = StructType([
    StructField("lat", FloatType(), True),
    StructField("lon", FloatType(), True),
    StructField("id", StringType(), True),
    StructField("source_id", StringType(), True),
    StructField("account_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("created_on", StringType(), True),
    StructField("updated_on", StringType(), True),
    StructField("start_ts", IntegerType(), True),
    StructField("until_ts", StringType(), True),
    StructField("report_type", StringType(), True),
    StructField("notes", StringType(), True),
    StructField("layer_id", StringType(), True),
    StructField("severity", StringType(), True)
])

target_columns = [
    "lat",
    "lon",
    "id",
    "title",
    "start_ts",
    "report_type",
    "notes",
    "severity"
]

### Shell command to clean quoted newlines

awk -v RS='"' 'NR % 2 == 0 { gsub(/\n/, " ") } { printf("%s%s", $0, RT) }' input_file > output_file

In [58]:
data_filepath = "../data/reports_12DEC16-26DEC16.tsv"

### Load Data

In [59]:
def severity_score(severity_rating):
    if severity_rating == "low":
        return 1
    elif severity_rating == "moderate":
        return 2
    elif severity_rating == "medium":
        return 3
    elif severity_rating == "high":
        return 4
    elif severity_rating == "extreme":
        return 5
    else:
        return 2
    
def severity_score_quadratic(severity_rating):
    if severity_rating == "low":
        return 1
    elif severity_rating == "moderate":
        return 4
    elif severity_rating == "medium":
        return 9
    elif severity_rating == "high":
        return 16
    elif severity_rating == "extreme":
        return 25
    else:
        return 4
    
def severity_score_log(severity_rating):
    if severity_rating == "low":
        return np.log(1)
    elif severity_rating == "moderate":
        return np.log(2)
    elif severity_rating == "medium":
        return np.log(3)
    elif severity_rating == "high":
        return np.log(4)
    elif severity_rating == "extreme":
        return np.log(5)
    else:
        return np.log(2)
    
def severity_score_exp(severity_rating):
    if severity_rating == "low":
        return np.exp(1)
    elif severity_rating == "moderate":
        return np.exp(2)
    elif severity_rating == "medium":
        return np.exp(3)
    elif severity_rating == "high":
        return np.exp(4)
    elif severity_rating == "extreme":
        return np.exp(5)
    else:
        return np.exp(2)

In [60]:
severity_score_udf = udf(lambda severity: 
                        severity_score(severity), 
                        FloatType()
                    )

severity_quadratic_udf = udf(lambda severity: 
                        severity_score_quadratic(severity), 
                        FloatType()
                    )

severity_logarithmic_udf = udf(lambda severity: 
                        severity_score_log(severity), 
                        FloatType()
                    )

severity_exponential_udf = udf(lambda severity: 
                        severity_score_exp(severity), 
                        FloatType()
                    )

In [67]:
# Load Data
reports_df = spark.read.csv(data_filepath,
                         sep="\t",
                         schema=schema,
                         header=None,
                         quote='"')

# Drop Nulls - will revisit
reports_df = reports_df.dropna()

# Keep only target columns
reports_df = reports_df.select([column for column in target_columns])

# Add index column
reports_df = reports_df.withColumn("index", monotonically_increasing_id())

# Create Lat/Long columns
reports_df = reports_df.withColumn(
    "lat_long",
    struct(reports_df.lat, reports_df.lon))

# Create Severity Features columns

# reports_df = reports_df.withColumn(
#     "severity_score",
#     severity_score_udf(col("severity"))
# )

# reports_df = reports_df.withColumn(
#     "severity_quadratic",
#     severity_quadratic_udf(col("severity"))
# )

# reports_df = reports_df.withColumn(
#     "severity_log",
#     severity_logarithmic_udf(col("severity"))
# )

# reports_df = reports_df.withColumn(
#     "severity_exp",
#     severity_exponential_udf(col("severity"))
# )


# Convert Timestamps to date time groups




In [68]:
reports_df.printSchema()
reports_df.show(1)

root
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- start_ts: integer (nullable = true)
 |-- report_type: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- severity: string (nullable = true)
 |-- index: long (nullable = false)
 |-- lat_long: struct (nullable = false)
 |    |-- lat: float (nullable = true)
 |    |-- lon: float (nullable = true)

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+--------+-----+-------------------+
|     lat|     lon|                  id|               title|  start_ts|report_type|               notes|severity|index|           lat_long|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+--------+-----+-------------------+
|37.98381|23.72754|KFg4bXSq5hGyhPkeG...|Security Message ...|1479132770|       OSAC|The U.S. Embassy ...| unrated|  

## Label Reports with Cities

### Load Cities Data and Build Lat/Long Column

In [91]:
cities_schema = StructType([
        StructField("name", StringType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("country_code", StringType(), True)
    ])

In [92]:
cities_df = spark.read.csv("../data/cities300000.csv", header=True, schema=cities_schema)

cities_df = cities_df.withColumn(
    "lat_long", 
    struct(cities_df.latitude, cities_df.longitude)
)
cities_df = cities_df.withColumn(
    "index",
    monotonically_increasing_id()
)

cities_df.persist()
cities_df.show(5)


+--------------+--------+---------+------------+-------------------+-----+
|          name|latitude|longitude|country_code|           lat_long|index|
+--------------+--------+---------+------------+-------------------+-----+
|         Dubai| 25.0657| 55.17128|          AE| [25.0657,55.17128]|    0|
|       Sharjah|25.33737| 55.41206|          AE|[25.33737,55.41206]|    1|
|        Al Ain|24.19167| 55.76056|          AE|[24.19167,55.76056]|    2|
|     Abu Dhabi|24.46667| 54.36667|          AE|[24.46667,54.36667]|    3|
|Mazār-e Sharīf|36.70904| 67.11087|          AF|[36.70904,67.11087]|    4|
+--------------+--------+---------+------------+-------------------+-----+
only showing top 5 rows



### Calculate Haversine Distances

In [93]:
for city in cities_df.select("lat_long").toLocalIterator():
    print tuple(city[0])
    break

(25.06570053100586, 55.17127990722656)


In [94]:
city_label_indices = []
for report in reports_df.select("lat_long").toLocalIterator():
    distances = [
        haversine(tuple(report[0]), tuple(city[0])) 
        for city in cities_df.select("lat_long").toLocalIterator()
    ]
    city_label_indices.append(np.argmin(distances))
    
print len(city_label_indices)
print reports_df.count()

error: [Errno 54] Connection reset by peer

### Apply City Labels

In [105]:
city_label_indices = pd.read_csv("../data/city_label_indices.csv", header=None)
city_label_indices = city_label_indices[1].values

In [107]:
city_labels = []
for index in city_label_indices:
    result = cities_df.where(cities_df.index == index).select("name").first()
    city_labels.append(result[0])

Exception: could not open socket

In [108]:
print len(city_label_indices)
print len(city_labels)
print reports_df.count()

98997
7641
98997


## Time Series Analysis

Show several cities, then Berlin


## Conclusions