In [1]:
import atexit
import os
import platform
import py4j
import pyspark
from operator import itemgetter
import csv, time, sys
from datetime import datetime, timedelta, date
import pandas as pd
import statsmodels.api as sm
import numpy as np
import matplotlib.pyplot as plt
import json

from sklearn.cluster import DBSCAN
from geopy.distance import great_circle
from shapely.geometry import MultiPoint
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark import SparkFiles
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.rdd import RDD

In [4]:
path = "./data/"

In [5]:
pandas_df = pd.read_csv(path+"pokemon.csv",sep=",")
pandas_df.head(2)

Unnamed: 0,id,identifier,species_id,height,weight,base_experience,order,is_default
0,1,bulbasaur,1,7,69,64,1,1
1,2,ivysaur,2,10,130,142,2,1


In [6]:
sparkDF = sqlContext.createDataFrame(pandas_df)
sparkDF.show(2)

+---+----------+----------+------+------+---------------+-----+----------+
| id|identifier|species_id|height|weight|base_experience|order|is_default|
+---+----------+----------+------+------+---------------+-----+----------+
|  1| bulbasaur|         1|     7|    69|             64|    1|         1|
|  2|   ivysaur|         2|    10|   130|            142|    2|         1|
+---+----------+----------+------+------+---------------+-----+----------+
only showing top 2 rows



In [7]:
sparkDF.count()

811

In [8]:
pandas_df.count()

id                 811
identifier         811
species_id         811
height             811
weight             811
base_experience    811
order              811
is_default         811
dtype: int64

In [9]:
sparkDF.columns

['id',
 'identifier',
 'species_id',
 'height',
 'weight',
 'base_experience',
 'order',
 'is_default']

In [10]:
pandas_df.columns

Index([u'id', u'identifier', u'species_id', u'height', u'weight',
       u'base_experience', u'order', u'is_default'],
      dtype='object')

In [11]:
pandas_df.dtypes

id                  int64
identifier         object
species_id          int64
height              int64
weight              int64
base_experience     int64
order               int64
is_default          int64
dtype: object

In [12]:
sparkDF.dtypes

[('id', 'bigint'),
 ('identifier', 'string'),
 ('species_id', 'bigint'),
 ('height', 'bigint'),
 ('weight', 'bigint'),
 ('base_experience', 'bigint'),
 ('order', 'bigint'),
 ('is_default', 'bigint')]

In [13]:
pandas_df[['height','weight']].describe()

Unnamed: 0,height,weight
count,811.0,811.0
mean,12.516646,636.096178
std,12.266613,1077.65871
min,1.0,1.0
25%,6.0,94.5
50%,10.0,295.0
75%,15.0,650.0
max,145.0,9997.0


In [14]:
sparkDF.select("height","weight").describe().show()

+-------+------------------+------------------+
|summary|            height|            weight|
+-------+------------------+------------------+
|  count|               811|               811|
|   mean|12.516646115906289| 636.0961775585697|
| stddev|12.266612584292778|1077.6587096303547|
|    min|                 1|                 1|
|    max|               145|              9997|
+-------+------------------+------------------+



### Filtering, Selections

In [15]:
pandas_df[pandas_df.height> 100]

Unnamed: 0,id,identifier,species_id,height,weight,base_experience,order,is_default
320,321,wailord,321,145,3980,175,387,1
792,10072,steelix-mega,208,105,7400,214,120,0
799,10079,rayquaza-mega,384,108,3920,351,467,0


In [16]:
sparkDF.filter("height>100").show()

+-----+-------------+----------+------+------+---------------+-----+----------+
|   id|   identifier|species_id|height|weight|base_experience|order|is_default|
+-----+-------------+----------+------+------+---------------+-----+----------+
|  321|      wailord|       321|   145|  3980|            175|  387|         1|
|10072| steelix-mega|       208|   105|  7400|            214|  120|         0|
|10079|rayquaza-mega|       384|   108|  3920|            351|  467|         0|
+-----+-------------+----------+------+------+---------------+-----+----------+



### Finding Uniques, Distincts

In [17]:
len(pd.unique(pandas_df['species_id']))

721

In [18]:
sparkDF.select("species_id").distinct().count()

721

### GroupBy and Aggregates

In [19]:
pandas_df[['species_id','id']].groupby(['species_id']).agg({"id":"count"}).\
    sort_values(by='id',ascending=False).head(2)

Unnamed: 0_level_0,id
species_id,Unnamed: 1_level_1
25,7
479,6


In [20]:
sparkDF.groupBy("species_id").agg(count("id").alias("counts")).\
    sort("counts",ascending=False).show(2)

+----------+------+
|species_id|counts|
+----------+------+
|        25|     7|
|       479|     6|
+----------+------+
only showing top 2 rows



In [21]:
joiner_df = pd.read_csv(path+"pokemon_species.csv")

In [22]:
joiner_df.columns

Index([u'id', u'identifier', u'generation_id', u'evolves_from_species_id',
       u'evolution_chain_id', u'color_id', u'shape_id', u'habitat_id',
       u'gender_rate', u'capture_rate', u'base_happiness', u'is_baby',
       u'hatch_counter', u'has_gender_differences', u'growth_rate_id',
       u'forms_switchable', u'order', u'conquest_order'],
      dtype='object')

In [23]:
pandas_df.columns

Index([u'id', u'identifier', u'species_id', u'height', u'weight',
       u'base_experience', u'order', u'is_default'],
      dtype='object')

In [24]:
pandas_df.merge(joiner_df,left_index=['species_id'],right_index=['id'],how='left').head(2)

Unnamed: 0,id_x,identifier_x,species_id,height,weight,base_experience,order_x,is_default,id_y,identifier_y,...,gender_rate,capture_rate,base_happiness,is_baby,hatch_counter,has_gender_differences,growth_rate_id,forms_switchable,order_y,conquest_order
0,1,bulbasaur,1,7,69,64,1,1,1.0,bulbasaur,...,1.0,45.0,70.0,0.0,20.0,0.0,4.0,0.0,1.0,
1,2,ivysaur,2,10,130,142,2,1,2.0,ivysaur,...,1.0,45.0,70.0,0.0,20.0,0.0,4.0,0.0,2.0,


In [25]:
joiner_sparkDF = sqlContext.createDataFrame(joiner_df)

In [26]:
sparkDF.join(joiner_sparkDF,sparkDF.species_id==joiner_sparkDF.id,'left').show(2)

+---+----------+----------+------+------+---------------+-----+----------+---+----------+-------------+-----------------------+------------------+--------+--------+----------+-----------+------------+--------------+-------+-------------+----------------------+--------------+----------------+-----+--------------+
| id|identifier|species_id|height|weight|base_experience|order|is_default| id|identifier|generation_id|evolves_from_species_id|evolution_chain_id|color_id|shape_id|habitat_id|gender_rate|capture_rate|base_happiness|is_baby|hatch_counter|has_gender_differences|growth_rate_id|forms_switchable|order|conquest_order|
+---+----------+----------+------+------+---------------+-----+----------+---+----------+-------------+-----------------------+------------------+--------+--------+----------+-----------+------------+--------------+-------+-------------+----------------------+--------------+----------------+-----+--------------+
|406|     budew|       406|     2|    12|             56| 

### RDDs

In [27]:
pokemon_rdd = sc.textFile(path+"pokemon.csv")
line_lenghts_from_rdd = pokemon_rdd.map(lambda Row: len(Row)).collect()

In [28]:
with open(path+"pokemon.csv") as f:
    lines = f.readlines()

line_lengths = list()
for line in lines:
    line_lengths.append(len(line.strip("\n")))

In [29]:
len(line_lengths)

812

In [30]:
len(line_lenghts_from_rdd)

812

In [31]:
sorted(line_lenghts_from_rdd)

[24,
 24,
 24,
 25,
 25,
 25,
 25,
 25,
 25,
 25,
 25,
 25,
 25,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,


In [32]:
sorted(line_lengths)

[24,
 24,
 24,
 25,
 25,
 25,
 25,
 25,
 25,
 25,
 25,
 25,
 25,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 26,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 27,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 28,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,
 29,


### Scikit on RDDs

In [57]:
#Data take from https://support.spatialkey.com/spatialkey-sample-csv-data/
temp = pd.read_csv(path+"SacramentocrimeJanuary2006.csv")
temp[['district','latitude','longitude']].to_csv(path+"location_super.csv",sep="|",index=False)

In [42]:
site_names_pdf = pd.read_csv(path+"location_super.csv",sep="|")

In [43]:
site_names_pdf.dtypes

district       int64
latitude     float64
longitude    float64
dtype: object

In [47]:
sites_names_rdd = sc.textFile(path+"location_super.csv")

In [46]:
def split_row(x):
    return x.split("|")

def prepare_for_dbscan(x):
    xx = list(x[1])
    list_coord_dict = list()
    for xxx in xx:
        coord_dict = dict()
        coord_dict['latitude'] = np.float(xxx[0].encode("utf-8"))
        coord_dict['longitude'] = np.float(xxx[1].encode("utf-8"))
        list_coord_dict.append(coord_dict)
    return x[0], list_coord_dict

def create_pandas_df(x):
    df = pd.DataFrame(x[1])
    return x[0],df

def pandas_df_to_matrix(x):
    return x[0], x[1].as_matrix(columns=['latitude', 'longitude'])

def create_clusters_via_dbscan(x):
    kms_per_radian = 6371.0088
    epsilon = .7/ kms_per_radian
    coords = x[1]
    db = DBSCAN(eps=epsilon, min_samples=1, algorithm='ball_tree', metric='haversine').fit(np.radians(coords))
    cluster_labels = db.labels_
    num_clusters = len(set(cluster_labels))
    clusters = pd.Series([coords[cluster_labels == n] for n in range(num_clusters)])
    return x[0], num_clusters, len(coords) #clusters

In [48]:
sites_names_rdd.map(split_row).filter(lambda Row: Row[0]!='district').\
    map(lambda Row: (Row[0],Row[1:])).groupByKey().map(prepare_for_dbscan).map(create_pandas_df).\
    map(pandas_df_to_matrix).map(create_clusters_via_dbscan).take(20)

[(u'1', 4, 868),
 (u'3', 1, 1575),
 (u'5', 1, 1159),
 (u'2', 5, 1462),
 (u'4', 1, 1161),
 (u'6', 1, 1359)]

In [49]:
sites_names_rdd.map(split_row).take(5)

[[u'district', u'latitude', u'longitude'],
 [u'3', u'38.55042047', u'-121.3914158'],
 [u'5', u'38.47350069', u'-121.4901858'],
 [u'2', u'38.65784584', u'-121.4621009'],
 [u'6', u'38.50677377', u'-121.4269508']]

In [58]:
sites_names_rdd.map(split_row).filter(lambda Row: Row[0]!='district').take(5)

[[u'3', u'38.55042047', u'-121.3914158'],
 [u'5', u'38.47350069', u'-121.4901858'],
 [u'2', u'38.65784584', u'-121.4621009'],
 [u'6', u'38.50677377', u'-121.4269508'],
 [u'2', u'38.6374478', u'-121.3846125']]

In [59]:
sites_names_rdd.map(split_row).filter(lambda Row: Row[0]!='district').\
    map(lambda Row: (Row[0],Row[1:])).take(5)

[(u'3', [u'38.55042047', u'-121.3914158']),
 (u'5', [u'38.47350069', u'-121.4901858']),
 (u'2', [u'38.65784584', u'-121.4621009']),
 (u'6', [u'38.50677377', u'-121.4269508']),
 (u'2', [u'38.6374478', u'-121.3846125'])]

In [60]:
sites_names_rdd.map(split_row).filter(lambda Row: Row[0]!='district').\
    map(lambda Row: (Row[0],Row[1:])).groupByKey().take(5)

[(u'1', <pyspark.resultiterable.ResultIterable at 0x119d00a50>),
 (u'3', <pyspark.resultiterable.ResultIterable at 0x119cf8950>),
 (u'5', <pyspark.resultiterable.ResultIterable at 0x119cf8b10>),
 (u'2', <pyspark.resultiterable.ResultIterable at 0x1199d0850>),
 (u'4', <pyspark.resultiterable.ResultIterable at 0x11a3e6890>)]

In [61]:
def prepare_for_dbscan(x):
    xx = list(x[1])
    list_coord_dict = list()
    for xxx in xx:
        coord_dict = dict()
        coord_dict['latitude'] = np.float(xxx[0].encode("utf-8"))
        coord_dict['longitude'] = np.float(xxx[1].encode("utf-8"))
        list_coord_dict.append(coord_dict)
    return x[0], list_coord_dict

sites_names_rdd.map(split_row).filter(lambda Row: Row[0]!='district').\
    map(lambda Row: (Row[0],Row[1:])).groupByKey().map(prepare_for_dbscan).take(1)

[(u'1',
  [{'latitude': 38.60960217, 'longitude': -121.4918375},
   {'latitude': 38.65994218, 'longitude': -121.5259008},
   {'latitude': 38.60893745, 'longitude': -121.5187927},
   {'latitude': 38.64378844, 'longitude': -121.5341593},
   {'latitude': 38.62798948, 'longitude': -121.4857344},
   {'latitude': 38.61153542, 'longitude': -121.5370613},
   {'latitude': 38.63036289, 'longitude': -121.4755269},
   {'latitude': 38.62453557, 'longitude': -121.5264198},
   {'latitude': 38.62504428, 'longitude': -121.4978986},
   {'latitude': 38.61488957, 'longitude': -121.4934272},
   {'latitude': 38.63207451, 'longitude': -121.5287747},
   {'latitude': 38.67510292, 'longitude': -121.5226753},
   {'latitude': 38.67318557, 'longitude': -121.5035491},
   {'latitude': 38.66695301, 'longitude': -121.5088707},
   {'latitude': 38.62099831, 'longitude': -121.5104473},
   {'latitude': 38.65699142, 'longitude': -121.5038615},
   {'latitude': 38.63351404, 'longitude': -121.5282646},
   {'latitude': 38.6324

In [62]:
def create_pandas_df(x):
    df = pd.DataFrame(x[1])
    return x[0],df

def pandas_df_to_matrix(x):
    return x[0], x[1].as_matrix(columns=['latitude', 'longitude'])

sites_names_rdd.map(split_row).filter(lambda Row: Row[0]!='district').\
    map(lambda Row: (Row[0],Row[1:])).groupByKey().map(prepare_for_dbscan).map(create_pandas_df).\
    map(pandas_df_to_matrix).take(2)

[(u'1', array([[  38.60960217, -121.4918375 ],
         [  38.65994218, -121.5259008 ],
         [  38.60893745, -121.5187927 ],
         ..., 
         [  38.62201506, -121.4700579 ],
         [  38.60799414, -121.4700221 ],
         [  38.61300454, -121.4918727 ]])),
 (u'3', array([[  38.55042047, -121.3914158 ],
         [  38.56433456, -121.4618826 ],
         [  38.55611545, -121.4142729 ],
         ..., 
         [  38.55790107, -121.4106352 ],
         [  38.57783198, -121.4704595 ],
         [  38.57203045, -121.4670118 ]]))]

In [63]:
def create_clusters_via_dbscan(x):
    kms_per_radian = 6371.0088
    epsilon = .7/ kms_per_radian
    coords = x[1]
    db = DBSCAN(eps=epsilon, min_samples=1, algorithm='ball_tree', metric='haversine').fit(np.radians(coords))
    cluster_labels = db.labels_
    num_clusters = len(set(cluster_labels))
    clusters = pd.Series([coords[cluster_labels == n] for n in range(num_clusters)])
    return x[0], num_clusters, len(coords)

sites_names_rdd.map(split_row).filter(lambda Row: Row[0]!='district').\
    map(lambda Row: (Row[0],Row[1:])).groupByKey().map(prepare_for_dbscan).map(create_pandas_df).\
    map(pandas_df_to_matrix).map(create_clusters_via_dbscan).take(10)

[(u'1', 4, 868),
 (u'3', 1, 1575),
 (u'5', 1, 1159),
 (u'2', 5, 1462),
 (u'4', 1, 1161),
 (u'6', 1, 1359)]