# B: Spark

**In this notebook:**
* importing pyspark modules
* setting spark context
* testing pyspark dataframe with orders and customer  (task B.i)
* testing pyspark dataframes with iterations (task B.ii)

## Installation

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 69 kB/s  eta 0:00:01   |█▏                              | 7.2 MB 1.1 MB/s eta 0:02:53     |█▊                              | 11.2 MB 3.8 MB/s eta 0:00:52     |██▍                             | 15.0 MB 1.9 MB/s eta 0:01:39     |██▊                             | 17.6 MB 1.5 MB/s eta 0:02:05     |███▏                            | 20.4 MB 1.5 MB/s eta 0:02:03     |████████▎                       | 52.8 MB 4.7 MB/s eta 0:00:32     |███████████▏                    | 70.9 MB 2.5 MB/s eta 0:00:53     |███████████▏                    | 71.2 MB 2.5 MB/s eta 0:00:53     |███████████▍                    | 72.5 MB 796 kB/s eta 0:02:46     |████████████▍                   | 78.9 MB 267 kB/s eta 0:07:48     |██████████████▊                 | 94.0 MB 1.5 MB/s eta 0:01:15     |██████████████▊                 | 94.1 MB 1.5 MB/s eta 0:01:15     |████████████████                | 101.5 MB 1

## Imports

In [1]:
from pyspark import SparkContext, SparkConf
import pandas as pd
from pyspark.sql import *
from pyspark.sql import functions as F

## Spark Context


In [2]:
conf = SparkConf().setAppName('HW1').setMaster("local[8]")
sc = SparkContext(conf=conf)

In [3]:
sqlContext = SQLContext(sc)

## (i) Find Relevant Customer Information

In this section I use the *orders.csv* and *customers.csv* files to explore the pyspark DataFrame API. In detail, I find the customer name, address and the average price of orders per customer who have
acctbal more than 1000 and for orders placed after 1995-01-01. 

### Load Data

First I load the content of the csv files into a spark dataframe. Then I add the appropriate table headers and delete the unneeded columns. 

In [7]:
orders = (
    sqlContext.read.format("csv")
    .options(header="false", inferSchema="True", delimiter="|", dateFormat="yyyy-MM-dd")
    .load("../../../../data/orders.csv")
)
order_columns = [
    "orderkey",
    "custkey",
    "orderstatus",
    "price",
    "orderdate",
    "orderpriority",
    "clerk",
    "shippriority",
    "comment",
]
orders = orders.toDF(*order_columns)["orderkey", "custkey", "price", "orderdate"]
print("Orders count: ", orders.count())
orders.printSchema()
orders.show(5)

Orders count:  15000
root
 |-- orderkey: integer (nullable = true)
 |-- custkey: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- orderdate: string (nullable = true)

+--------+-------+---------+----------+
|orderkey|custkey|    price| orderdate|
+--------+-------+---------+----------+
|       1|    370|172799.49|1996-01-02|
|       2|    781| 38426.09|1996-12-01|
|       3|   1234| 205654.3|1993-10-14|
|       4|   1369| 56000.91|1995-10-11|
|       5|    445|105367.67|1994-07-30|
+--------+-------+---------+----------+
only showing top 5 rows



In [8]:
customers = (
    sqlContext.read.format("csv")
    .options(header="false", inferSchema="True", delimiter="|", dateFormat="yyyy-MM-dd")
    .load("../../../../data/customers.csv")
)
customers_columns = [
    "custkey",
    "name",
    "addres",
    "nationkey",
    "phone",
    "acctbal",
    "mktsegment",
    "comment",
]
customers = customers.toDF(*customers_columns)["custkey", "name", "addres", "acctbal"]
print("Customer count: ", customers.count())

customers.printSchema()
customers.show(5)

Customer count:  1500
root
 |-- custkey: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- addres: string (nullable = true)
 |-- acctbal: double (nullable = true)

+-------+------------------+--------------------+-------+
|custkey|              name|              addres|acctbal|
+-------+------------------+--------------------+-------+
|      1|Customer#000000001|   IVhzIApeRb ot,c,E| 711.56|
|      2|Customer#000000002|XSTf4,NCwDVaWNe6t...| 121.65|
|      3|Customer#000000003|        MG9kdTD2WBHm|7498.12|
|      4|Customer#000000004|         XxVSJsLAGtn|2866.83|
|      5|Customer#000000005|KvpyuHCplrB84WgAi...| 794.47|
+-------+------------------+--------------------+-------+
only showing top 5 rows



### Filter DataFrame

In order to find the correct customers, I filter out orders placed before 1995-01-01 and customer with less thann 1000 acctbal. In addition I calculate the avg price for all orders per customers. 

In [9]:
# Filter Orders after 1995-01-01
orders_filtered_by_date = orders.filter(F.to_date(orders.orderdate) >= (F.lit("1995-01-01")))
print('Orders placed after 1995-01-01: ', orders_filtered_by_date.count())

# Group orders by customer keys and calculate avg price
customers_orders = orders_filtered_by_date.groupBy("custkey").agg(
    F.avg("price").alias("avg_price")
)
customers_orders.show(3)

Orders placed after 1995-01-01:  8134
+-------+------------------+
|custkey|         avg_price|
+-------+------------------+
|    496|143799.39299999998|
|   1342| 147223.2709090909|
|    463|103289.37076923078|
+-------+------------------+
only showing top 3 rows



In [10]:
# Filter customers with less than 1000 acctbal
customers_with_high_acctbal = customers.filter(customers.acctbal >= 1000)
print('Customers with more than 1000 acctbal: ', customers_with_high_acctbal.count())
customers_with_high_acctbal.show(3)

Customers with more than 1000 acctbal:  1237
+-------+------------------+--------------------+-------+
|custkey|              name|              addres|acctbal|
+-------+------------------+--------------------+-------+
|      3|Customer#000000003|        MG9kdTD2WBHm|7498.12|
|      4|Customer#000000004|         XxVSJsLAGtn|2866.83|
|      6|Customer#000000006|sKZz0CsnMD7mp4Xd0...|7638.57|
+-------+------------------+--------------------+-------+
only showing top 3 rows



In [14]:
# Join
result = customers_with_high_acctbal.join(customers_orders, ['custkey'], how='inner')
print(f'We found {result.count()} customers who placed an order after 1995-01-01 and have an acctbal with over 1000.',)
result.limit(10).toPandas()

We found 815 customers who placed an order after 1995-01-01 and have an acctbal with over 1000.


Unnamed: 0,custkey,name,addres,acctbal,avg_price
0,496,Customer#000000496,Y8oYLlHme6Z4fEzkTu,8174.82,143799.393
1,1342,Customer#000001342,FD6UNqfsYMKkf3ZFZdI4EaYMZ,1520.34,147223.270909
2,1088,Customer#000001088,"YjXQtOJoM0nhClEy0,WFdNxvJ1g6xpn kL2ommEv",2098.62,121228.133333
3,148,Customer#000000148,BhSPlEWGvIJyT9swk vCWE,2135.6,132866.519
4,1238,Customer#000001238,"HGCJI27,RIIQcS20,DcJbMQuUmN3Vhzdm",4299.22,232410.616667
5,392,Customer#000000392,H7M6JObndO,8492.33,131020.34
6,737,Customer#000000737,NdjG1k243iCLSoy1lYqMIrpvuH1Uf75,2501.74,192944.15
7,1483,Customer#000001483,ZjY1C b6cOnY3,4409.7,81726.607778
8,623,Customer#000000623,HXiFb9oWlgqZXrJPUCEJ6zZIPxAM4m6,7887.6,137388.762857
9,1084,Customer#000001084,E091r836A8TPqn5,1416.75,152526.045


In [None]:
### price needs to be calcluated

### Save Results

In [18]:
result['name', 'addres', 'avg_price'].repartition(1).write.csv('output_b_i',   sep='|', header=False)

## (ii): Find Shortest Path

### Load Graph Data

In [4]:
graph_data = (
    sqlContext.read.format("csv")
    .options(header="true", inferSchema="True", delimiter="\t")
    .load("../../../../data/ca-GrQc.txt")
).repartition(100).cache()

In [5]:
graph_data.show(5)
print(f'There are {graph_data.select(F.col("ToNodeId")).distinct().count()} distinct nodes')

+----------+--------+
|FromNodeId|ToNodeId|
+----------+--------+
|     21943|   15850|
|     21771|    5767|
|     19568|   25034|
|     16994|   22644|
|     23559|   18757|
+----------+--------+
only showing top 5 rows

There are 5242 distinct nodes


In [6]:
graph_data.printSchema()

root
 |-- FromNodeId: integer (nullable = true)
 |-- ToNodeId: integer (nullable = true)



### Create Initial DataFrames

In [15]:
def to_path(row):
    path = ""
    for i in range(len(row)):
        path += str(row[str(i)])
        path += ", "
    return row['0'], row[str(len(row)-1)], path[:-2], len(row)-1

In [18]:
search = graph_data.filter(graph_data.FromNodeId == 17274)
search = search.toDF('0', '1').cache()
search.show()
shortest_path = search.rdd.map(to_path).toDF().toDF('17274', 'destination-node-id', 'path', 'cost').repartition(50).cache()
shortest_path.show()

+-----+----+
|    0|   1|
+-----+----+
|17274|4298|
|17274|4416|
+-----+----+

+-----+-------------------+-----------+----+
|17274|destination-node-id|       path|cost|
+-----+-------------------+-----------+----+
|17274|               4416|17274, 4416|   1|
|17274|               4298|17274, 4298|   1|
+-----+-------------------+-----------+----+



In [19]:
import datetime

def get_stats(i, search, shortest_path, graph_data,
       start, end, desc, cache, part):
    return {
        "i": i,
        "search": search.rdd.getNumPartitions(),
        "shortest_path": shortest_path.rdd.getNumPartitions(),
        "graph_data": graph_data.rdd.getNumPartitions(),
        "time": str(end - start),
        "timen": (end - start).seconds + (end - start).microseconds * 10 **-6,
        "desc": desc,
        "cache": cache,
        "part": part,
    }

In [20]:
def procRDD(rdd, cache=True, part=False, npart=16):
    rdd = rdd if not part else rdd.repartition(npart)
    return rdd if not cache else rdd.cache()

In [21]:
def run(
    search,
    graph_data,
    shortest_path,
    description,
    cache=True,
    part=False,
    npart=16,
    n=100,
):
    times = []
    for i in range(2, n + 2):

        # Initialize
        start = datetime.datetime.now()
        startnode_column = str(i - 1)

        # Add one node to paths
        search = search.join(
            graph_data.withColumnRenamed("ToNodeId", f"{i}"),
            F.col("FromNodeId") == F.col(startnode_column),
            how="left",
        )

        # Delete duplicates
        search = search.drop("FromNodeId").drop_duplicates(subset=[f"{i}"]).cache()

        # Filter out longer paths
        search = search.join(
            shortest_path.withColumnRenamed("destination-node-id", f"{i}"),
            on=[f"{i}"],
            how="leftanti",
        ).cache()

        # Add new paths to shortest path dataframe
        if not search.rdd.isEmpty():
            new_paths = procRDD(
                search.rdd.map(to_path).toDF(
                    ["17274", "destination-node-id", "path", "cost"]
                ),
                cache,
                part,
                npart,
            ).cache()

            shortest_path = procRDD(shortest_path.union(new_paths), cache, part, npart)
            print(f"{shortest_path.count()} shortest paths found. \n")
            end = datetime.datetime.now()
            times.append(
                get_stats(
                    i,
                    search,
                    shortest_path,
                    graph_data,
                    start,
                    end,
                    description,
                    cache,
                    part,
                )
            )
        else:
            break
    return times, shortest_path

In [22]:
times, shortest_paths = run(search, graph_data,shortest_path, "cache + partition", True, True,  32)

sqlContext.createDataFrame(times).select("i", "search", "graph_data", "shortest_path", "time", "desc").show()
shortest_paths.show()

21 shortest paths found. 

116 shortest paths found. 

647 shortest paths found. 

1934 shortest paths found. 

3342 shortest paths found. 

3916 shortest paths found. 

4084 shortest paths found. 

4137 shortest paths found. 

4151 shortest paths found. 

4158 shortest paths found. 





+---+------+----------+-------------+--------------+-----------------+
|  i|search|graph_data|shortest_path|          time|             desc|
+---+------+----------+-------------+--------------+-----------------+
|  2|   200|       100|           32|0:00:04.402319|cache + partition|
|  3|   200|       100|           32|0:00:04.248332|cache + partition|
|  4|   200|       100|           32|0:00:04.874804|cache + partition|
|  5|   200|       100|           32|0:00:07.244846|cache + partition|
|  6|   200|       100|           32|0:00:07.243627|cache + partition|
|  7|   200|       100|           32|0:00:06.901601|cache + partition|
|  8|   200|       100|           32|0:00:05.431574|cache + partition|
|  9|   200|       100|           32|0:00:05.299704|cache + partition|
| 10|   200|       100|           32|0:00:06.404287|cache + partition|
| 11|   200|       100|           32|0:00:06.319385|cache + partition|
+---+------+----------+-------------+--------------+-----------------+

+----

### Save Results

In [23]:
shortest_paths.repartition(1).write.csv('output_b_ii',   sep='|', header=True)