In [1]:
from pyspark import SparkContext
import os
os.chdir('/Users/chkapsalis/Documents/GitHub/Big_Data_Architectures/3. Spark')

sc = SparkContext("local[1]", "app")

25/03/16 17:55:00 WARN Utils: Your hostname, ChristoorossAir resolves to a loopback address: 127.0.0.1; using 192.168.1.18 instead (on interface en0)
25/03/16 17:55:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/16 17:55:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


All the pairs of RDD Operations take an optional second parameter for the number of tasks to run.  
```
words.reduceByKey(lambda x, y: x + y, 5)
words.groupByKey(5)
visits.join(pageViews, 5)
```

## Using Local Variables in Spark

External variables in a closure will automatically be shipped to the cluster  \
```
query = raw_input('Enter a query: ')
pages.filter(lambda x: x.startswith(query)).count()
```

### Caveats
- Each task gets a new copy (updates are NOT sent back)
- Variables must be Serializable (Java/Scala) or pickle-able (Python)
- Don't use fields of an outer object (pyspark will ship all of it!)

## The PageRank Algorithm


### Basic Idea:  
It gives pages ranks (scores) based on links to them ("backlinks")
- Links from many pages lead to a higher rank.
- Links from high-rank pages also contribute to a higher rank.

### Algorithm Description
1. Start each page at rank 1.
2. On each iteration, have page 'p' contribute rank_p / |neighbors_p| to its neighbors
3. Set each page's rank to 0.15 + 0.85 x contributions

In [2]:
IterN = 2

links = sc.parallelize([(1, [2]), (2, [4]), (3, [1,2]), (4, [2,3])])  # RDD of (url, neighbors) pairs
ranks = sc.parallelize([(1,1), (2,1), (3,1), (4,1)])  # RDD of (url, rank) pairs; at the beginning, they are all appointed rank=1

In [3]:
def compute_contribs(pair):
    """ 
    Incoming pairs (after joining links and ranks) will have the form (url, [neighbors_list, current_rank])
    """
    [url, [links, rank]] = pair  # split the input key-value pair to be able to use their underlying values independently
    return [(dest, rank / len(links)) for dest in links]  # this showcases how much the currently considered page contributes to each of its neighbors

In [4]:
for i in range(IterN):
    contribs = links.join(ranks).flatMap(compute_contribs)  
    ranks = contribs.reduceByKey(lambda x, y: x + y) \
                        .mapValues(lambda x: 0.15 + 0.85 * x)

    print(ranks.collect())

                                                                                

[(4, 1.0), (2, 1.8499999999999999), (3, 0.575), (1, 0.575)]
[(3, 0.575), (1, 0.394375), (4, 1.7224999999999997), (2, 1.3081249999999998)]


```



```

## The Airlines Delay Algorithm

In [5]:
import matplotlib.pyplot as plt 
from datetime import datetime 

DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"

In [22]:
def parse(row):
    """Parses a row and returns (airline, dep_delay, arv_delay)"""    
    try:
        row[0] = datetime.strptime(row[0], DATE_FMT).date()  # parse time from string value
    except ValueError:
        row[0] = datetime.strptime('1970-01-01', DATE_FMT).date()  # fallback value "epoch 1" - we could as well just drop these 
    try:
        row[5] = datetime.strptime(row[5], TIME_FMT).time()
    except ValueError:
        row[5] = datetime.strptime('0000', TIME_FMT).time()
    try:
        row[6] = float(row[6])
    except ValueError:
        row[6] = 0.0
    try:
        row[7] = float(row[7])
    except ValueError:
        row[7] = 0.0
    try: 
        row[8] = datetime.strptime(row[8], TIME_FMT).time()
    except ValueError:
        row[8] = datetime.strptime('0000', TIME_FMT).time()
    try: 
        row[9] = float(row[9])
    except ValueError:
        row[9] = 0.0
    try: 
        row[10] = float(row[10])
    except ValueError:
        row[10] = 0.0

    return row[1], row[7], row[10]

In [23]:
# Loading the airlines lookup dictionary
airlines = dict(sc.textFile('file:///' + os.getcwd() + '/airlines_no_header.csv') \
                .map(lambda line: line.split(',')) \
                .map(lambda x: (x[0], x[1])).collect()
)

In [24]:
# Reading the CSV data
flights = sc.textFile('file:///' + os.getcwd() + '/ontime_flights_no_header.csv') \
        .map(lambda line: line.split(',')) \
        .map(parse)

In [25]:
# Calculating the sum of departure and arrival delays per flight
delays = flights.map(
    lambda f: (f[0], f[1] + f[2])
)

In [26]:
# Calculate the total delays per airline
delays = delays.reduceByKey(lambda x, y: x + y).collect()
delays = sorted(delays, key=lambda x: x[1])
for d in delays:
    print(f'{d[0]}: {d[1]} minutes delayed in total. ({airlines[d[0]]})')

[Stage 13:>                                                         (0 + 1) / 1]

19690: 39000.0 minutes delayed in total. (Hawaiian Airlines Inc.)
21171: 96774.0 minutes delayed in total. (Virgin America)
20437: 127775.0 minutes delayed in total. (AirTran Airways Corporation)
19930: 133306.0 minutes delayed in total. (Alaska Airlines Inc.)
20436: 156030.0 minutes delayed in total. (Frontier Airlines Inc.)
20409: 520070.0 minutes delayed in total. (JetBlue Airways)
20355: 549922.0 minutes delayed in total. (US Airways Inc.)
20398: 807684.0 minutes delayed in total. (American Eagle Airlines Inc.)
19977: 840763.0 minutes delayed in total. (United Air Lines Inc.)
19805: 914824.0 minutes delayed in total. (American Airlines Inc.)
20304: 1050982.0 minutes delayed in total. (SkyWest Airlines Inc.)
19790: 1104519.0 minutes delayed in total. (Delta Air Lines Inc.)
20366: 1766868.0 minutes delayed in total. (ExpressJet Airlines Inc.)
19393: 2740828.0 minutes delayed in total. (Southwest Airlines Co.)


                                                                                