In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
sc = pyspark.SparkContext(appName="Taxis")

In [3]:
trips = sc.textFile('hdfs://master:9000/yellow_tripdata_1m.csv')
vendors = sc.textFile('hdfs://master:9000/yellow_tripvendors_1m.csv')

## Part 1: Data Analysis ##

### a) Average trip duration per hour it started ###

In [4]:
from datetime import datetime

def strip_time(time):
    return datetime.strptime(time, '%Y-%m-%d %H:%M:%S')

In [5]:
avg_trp_dur = trips.map(lambda line: line.split(",")[1:3])\
        .map(lambda lst: (strip_time(lst[0]), strip_time(lst[1])))\
        .map(lambda lst: (lst[0].hour, lst[1]-lst[0]))\
        .mapValues(lambda x: (x, 1))\
        .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
        .map(lambda lst: (lst[0], lst[1][0].total_seconds()/(60*lst[1][1])))

In [6]:
print(avg_trp_dur.top(24))

[(23, 13.95847112523272), (22, 14.231797625637356), (21, 13.510855327392882), (20, 13.57589963636424), (19, 14.221208805985727), (18, 15.29045374860119), (17, 16.510825654408613), (16, 17.213072069374675), (15, 30.223498632126276), (14, 16.523138380789476), (13, 15.553918733195129), (12, 15.130881322885683), (11, 14.935821221905567), (10, 14.657939169698276), (9, 14.67010641976562), (8, 14.627504543367822), (7, 13.395006418527384), (6, 12.487420237563239), (5, 13.275583221175415), (4, 13.799857931121963), (3, 13.322282520526887), (2, 13.0356355926767), (1, 13.975069898907133), (0, 14.01779373736224)]


In [7]:
avg_trp_dur = avg_trp_dur.sortByKey(ascending=True)\
            .map(lambda tpl: {"HourOfDay":tpl[0], "AverageTripTime":tpl[1]})
fieldnames=["HourOfDay", "AverageTripTime"]

In [8]:
import csv
from io import StringIO

def writeRecords(records):
    """Write out CSV lines"""
    output = StringIO()
    writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter='\t')
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]

In [9]:
!hadoop fs -rm -r /average_trip.csv 
avg_trp_dur.mapPartitions(writeRecords).saveAsTextFile("hdfs://master:9000/average_trip.csv")

rm: `/average_trip.csv': No such file or directory


### b) Max amount paid per vendor ####

In [10]:
vendors_key_val = vendors.map(lambda line: line.split(",")[0:2])\
                        .map(lambda lst: (lst[0],lst[1]))

In [11]:
trips_key_val = trips.map(lambda line: line.split(",")[:])\
                    .map(lambda lst: (lst[0],lst[7]))

In [12]:
joinRDD = vendors_key_val.join(trips_key_val)\
                    .map(lambda tpl: tpl[1])\
                    .reduceByKey(max)

In [13]:
print(joinRDD.top(2))

[('2', '99.99'), ('1', '995.3')]


In [14]:
max_by_vndr = joinRDD.sortByKey(ascending=True)\
            .map(lambda tpl: {"VendorID":tpl[0], "MaxAmountPaid":tpl[1]})
fieldnames=["VendorID", "MaxAmountPaid"]

In [15]:
!hadoop fs -rm -r /max_amount.csv 
max_by_vndr.mapPartitions(writeRecords).saveAsTextFile("hdfs://master:9000/max_amount.csv")

rm: `/max_amount.csv': No such file or directory


## Part 2: Machine Learning ##

In [16]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array

In [17]:
ml_data = trips.map(lambda line: line.split(",")[3:5])\
                .map(lambda lst: array([float(lst[0]), float(lst[1])]))

In [18]:
init_model = KMeansModel(ml_data.take(5))

In [19]:
clusters = KMeans.train(ml_data, 5, maxIterations=3, initializationMode="kmeans||", initialModel=init_model)

In [20]:
clusters.clusterCenters

[array([-78.50386634,  40.61111272]),
 array([-0.0001304 ,  0.00050874]),
 array([-73.95393671,  40.69903131]),
 array([-73.99216792,  40.74268109]),
 array([-73.96113226,  40.77182523])]

In [21]:
i = 1
centroids = []
for c in clusters.clusterCenters:
    centroids.append((i, c))
    i += 1

In [22]:
p_centroids = sc.parallelize(centroids)
p_centroids = p_centroids.map(lambda tpl: {"ID":tpl[0], "Centroid":tpl[1]})
fieldnames=["ID", "Centroid"]

In [23]:
!hadoop fs -rm -r /centroids.csv 
p_centroids.mapPartitions(writeRecords).saveAsTextFile("hdfs://master:9000/centroids.csv")

rm: `/centroids.csv': No such file or directory


## Part 3: Page Rank ##


In [47]:
pages = sc.textFile('hdfs://master:9000/web-Google.txt')

In [65]:
edges = pages.filter(lambda line: line[0]!='#')\
            .map(lambda line: line.split('\t'))\
            .map(lambda lst: (int(lst[0]), int(lst[1])))

In [66]:
N = 875713
p0 = 0.5
d = 0.85

constant = 1-d/N

In [67]:
print(edges.top(2))

[(916427, 910217), (916427, 843844)]


In [68]:
nodes = edges.map(lambda tpl: tpl[0]).distinct()

In [69]:
print(nodes.top(2))

[916427, 916425]


In [84]:
nodes_info = edges.groupByKey().mapValues(list)\
                  .map(lambda x: (x[0], x[1], p0))

In [None]:
print(nodes_info.top(1))

##### 