In [1]:
import pyspark
from pyspark.sql import Row
from pyspark.sql import SQLContext
sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)

In [2]:
def get_DF(file_path):
    file_rdd = sc.textFile(file_path)
    header = file_rdd.first() # Obtenemos el header
    DF = file_rdd.filter(lambda line: line != header)\
                            .map(lambda line: line.replace('/t', ' '))\
                            .map(lambda line: create_row(header)(*line.split(',')))\
                            .toDF()
    return DF

In [3]:
def create_row(header): 
    return Row (*tuple(header.split(',')))

In [4]:
data_frame = get_DF('fatalities_country.csv')

In [5]:
data_frame.printSchema()

root
 |-- country_id: string (nullable = true)
 |-- fatalities: string (nullable = true)



In [6]:
data_frame.registerTempTable('africa')

In [7]:
def get_country_df(country):
    return sqlContext.sql('select country_id, fatalities from africa where country_id = %s' % country)

In [8]:
for i in range(1,49):
    data_frame = get_country_df(i)
    total_fatalities = data_frame.map(lambda x: x.fatalities).reduce(lambda a,b: int(a)+int(b))
    total_events = data_frame.count()
    print "INSERT INTO public.fatalities VALUES (%s,%s,%s);" % (i, total_events, total_fatalities)

INSERT INTO public.fatalities VALUES (1,765,1334);
INSERT INTO public.fatalities VALUES (2,183,16);
INSERT INTO public.fatalities VALUES (3,1204,1265);
INSERT INTO public.fatalities VALUES (4,8228,74883);
INSERT INTO public.fatalities VALUES (5,760,3101);
INSERT INTO public.fatalities VALUES (6,2120,45986);
INSERT INTO public.fatalities VALUES (7,613,7585);
INSERT INTO public.fatalities VALUES (8,17489,23875);
INSERT INTO public.fatalities VALUES (9,1558,3951);
INSERT INTO public.fatalities VALUES (10,370,497);
INSERT INTO public.fatalities VALUES (11,8541,50157);
INSERT INTO public.fatalities VALUES (12,503,3329);
INSERT INTO public.fatalities VALUES (13,501,258);
INSERT INTO public.fatalities VALUES (14,115,14);
INSERT INTO public.fatalities VALUES (15,367,400);
INSERT INTO public.fatalities VALUES (16,3698,12939);
INSERT INTO public.fatalities VALUES (17,1004,231);
INSERT INTO public.fatalities VALUES (18,932,1382);
INSERT INTO public.fatalities VALUES (19,273,63);
INSERT INTO publi

In [21]:
data_frame = get_DF('info.csv')

In [28]:
data_frame.registerTempTable('africa_countries')

In [29]:
sqlContext.sql('show tables').show()

+----------------+-----------+
|       tableName|isTemporary|
+----------------+-----------+
|africa_countries|       true|
|          africa|       true|
+----------------+-----------+



In [31]:
from dateutil.parser import parse
from pyspark.mllib.stat import Statistics
from pyspark.mllib.clustering import KMeans, StreamingKMeans, GaussianMixture, PowerIterationClustering
from pyspark.mllib.feature import StandardScaler
import numpy as np
from pyspark.mllib.linalg import Vectors
import datetime

In [25]:
rdd = sqlContext.sql('select country_id, event_date, fatalities, latitud, longitud from africa_countries')\
        .map(lambda x: Vectors.dense(float(x.latitud),\
                                     float(x.longitud),
                                     int((datetime.datetime.now()-parse(x.event_date)).seconds),\
                                     int(x.fatalities)))

In [26]:
features = rdd.map(lambda x: [x[0],x[1],x[2]])
fatalities = rdd.map(lambda x: x[3])
scaler = StandardScaler(withMean=True, withStd=True).fit(features)
normalizedData = scaler.transform(features)
nClusters = 10
model = KMeans().train(normalizedData, nClusters, maxIterations=100,\
                        initializationMode="k-means||")
labels = model.predict(normalizedData)
labels.distinct().count()

5

In [27]:
labels.take(5)

[2, 2, 2, 2, 2]