---
# **DATA PARSER and GRAPH CONSTRUCTION**
using `PySpark` and `networkx`

---

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 37 not upgraded.


In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
# LIBRARIES 

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext

import numpy as np
import networkx as nx
from google.colab import files

In [None]:
sc = SparkContext.getOrCreate()
sc.stop()

# Create the Spark Session
spark = SparkSession.builder.getOrCreate()
# Create the Spark Context
sc = spark.sparkContext

### Upload the data and remove header

Drop the selected CSV file to `Files` on the left to upload them to session storage. 

Write the selected CSV filename into the following cell.

In [None]:
filename = ""
lines = sc.textFile(filename)

header = lines.first()
lines = lines.filter(lambda line: line != header)

---
---

# **STEP 1**

Step 1 is divided into the three blocks:


> **Block 1:** bold text construction of `normalized_unique_pairs`


> **Block 2:** construction of `basket_to_product`


> Block 3: construction of `hh_to_basket`







---
## **Block 1**

Construction of RDD pipeline `normalized_unique_pairs` with all pairs with normalized weights 
: `(key: PRODUCT PAIR ---> value: WEIGHT)`

> **Before:** lines of format as in original CSV: 
`household_key,BASKET_ID,DAY,PRODUCT_ID,QUANTITY,SALES_VALUE,STORE_ID,RETAIL_DISC,TRANS_TIME,WEEK_NO,
COUPON_DISC,COUPON_MATCH_DISC`

> **After:** lines of format `((product_ID_1, product_ID_2), normalized_weight)`











### Helper functions 
#### for Block 1

In [None]:
def make_list(line):
  """
    Input: line of format as in original CSV
    Output: a pair formated as
            (basket_ID, product_ID)
  """
  splitted = line.split()
  lst = splitted[0].split(",")
  lst = list(map(lambda x: x, lst))

  BASKET_ID = int(lst[1])
  PRODUCT_ID = int(lst[3])

  return BASKET_ID, PRODUCT_ID


def generate_pairs_of_elements(line):
  """
    Input: line of RDD pipeline of format
            (basket_id, [prod_id_1, prod_id_2, ... prod_id_n])
    Output: pairs between all products in list [prod_id_1, prod_id_2, ... prod_id_n] with assigned weight:
                - pairs with distinct products get weight 1
                - self-loops get weight 0
             --> (((prod1_id, prod1_id), 0), ((prod1_id, prod2_id), 1), ..., ((prod1_id, prodn_id), 1), ((prod2_id, prod1_id), 1), ((prod2_id, prod2_id), 0), ...)
          
  """
  BASKET_ID = line[0]
  PRODUCTS_IDS = line[1]

  pairs = []

  if len(PRODUCTS_IDS) == 1:
    pairs.append(((PRODUCTS_IDS[0], PRODUCTS_IDS[0]), 0))
    return pairs
  else:
    for ID1 in PRODUCTS_IDS:
      pairs.append(((ID1, ID1), 0))
      for ID2 in PRODUCTS_IDS:
        if ID1 != ID2:
          pairs.append(((ID1, ID2), 1))

    return pairs

### Execution cell 
#### for Block 1

In [None]:
basket_list = lines.map(make_list)
baskets_content = basket_list.groupByKey().mapValues(list)
generated_pairs = baskets_content.flatMap(generate_pairs_of_elements)
unique_pairs = generated_pairs.groupByKey().mapValues(list)
sum_unique_pairs = unique_pairs.map(lambda pair: (pair[0], np.sum(pair[1])))
sum_unique_pairs_correction = sum_unique_pairs.map(lambda pair: (pair[0], 1 if pair[1] == 0 else pair[1]))
W = sum_unique_pairs_correction.map(lambda line: line[1])
norm = W.max()
normalized_unique_pairs = sum_unique_pairs_correction.map(lambda pair: (pair[0], pair[1] / norm))

---
# **Block 2**
Construction of RDD pipeline `basket_to_product` with all pairs 
`(key: BASKET (time_stamp) ---> value: PRODUCT pair)`


> **Before:** lines of format as in original CSV:
`household_key,BASKET_ID,DAY,PRODUCT_ID,QUANTITY,SALES_VALUE,STORE_ID,RETAIL_DISC,TRANS_TIME,WEEK_NO,
COUPON_DISC,COUPON_MATCH_DISC`




> **After:** lines of format `((basket_ID, (time_stamp)), (product_ID_1, product_ID_2))`







### Helper functions 
#### for Block 2

In [None]:
def make_line_basket_product(line):
  """
    Input: line of format as in original CSV
    Output: ((basket_ID, (time_stamp)), product_ID)
  """
  splitted = line.split()
  lst = splitted[0].split(",")
  lst = list(map(lambda x: x, lst))

  HOUSEHOLD_KEY = int(lst[0])
  TIME_STAMP = (int(lst[9]), int(lst[2]), int(lst[8]))
  BASKET_ID = int(lst[1])
  PRODUCT_ID = int(lst[3])

  return (BASKET_ID, TIME_STAMP), PRODUCT_ID



def KEY_generate_pairs_of_elements(line):
  """
    Input: one line of RDD pipeline of format
            (basket_ID, (time_stamp)), product
    Output: key + pairs between all products:
             --> ((prod1_id, prod1_id), (prod1_id, prod2_id), ..., (prod1_id, prodn_id), (prod2_id, prod1_id), (prod2_id, prod2_id), ...)
  """
  KEY = line[0]
  PRODUCTS_IDS = line[1]

  pairs = []

  if len(PRODUCTS_IDS) == 1:
    pairs.append((PRODUCTS_IDS[0], PRODUCTS_IDS[0]))
    return ((KEY, pair) for pair in pairs)
  else:
    for ID1 in PRODUCTS_IDS:
      pairs.append((ID1, ID1))
      for ID2 in PRODUCTS_IDS:
        if ID1 != ID2:
          pairs.append((ID1, ID2))

  return ((KEY, pair) for pair in pairs)


### Execution cell 
#### for Block 2

In [None]:
house_time_products_line = lines.map(make_line_basket_product)
grouped_basket = house_time_products_line.groupByKey().mapValues(list)
basket_to_product = grouped_basket.flatMap(KEY_generate_pairs_of_elements)

---
# **Block 3**

Construction of RDD pipeline `hh_to_basket` with all pairs `(key: HOUSEHOLD ---> value: BASKET (time_stamp))`


lines of format as in original CSV:

> **Before:** lines of format as in original CSV:
`household_key,BASKET_ID,DAY,PRODUCT_ID,QUANTITY,SALES_VALUE,STORE_ID,RETAIL_DISC,TRANS_TIME,WEEK_NO,
COUPON_DISC,COUPON_MATCH_DISC`

> **After:** lines of format `(HOUSEHOLD_KEY, (BASKET_ID, (time_stamp)))`




### Helper functions 
#### for Block 3

In [None]:
def make_line_household_basket(line):
  """
    Input: line of format as in original CSV
    Output: (household_key, basket_id)
  """
  splitted = line.split()

  lst = splitted[0].split(",")
  lst = list(map(lambda x: x, lst))

  HOUSEHOLD_KEY = int(lst[0])
  BASKET_ID = int(lst[1])
  TIME_STAMP = (int(lst[9]), int(lst[2]), int(lst[8]))

  return HOUSEHOLD_KEY, (BASKET_ID, (TIME_STAMP))

### Execution cell 
#### for Block 3

In [None]:
house_basket_line = lines.map(make_line_household_basket)
hh_to_basket = house_basket_line.groupByKey().flatMapValues(list).distinct()

---
--- 

# **STEP 2**


Construction of RDD pipeline `JOINED` with joining data from STEP 1: `(HOUSEHOLD ---> (BASKETS ---> PRODUCTS))`



> **Before:** Three RDD pipelines from step 1:
*   *Block 1:* `normalized_unique_pairs`:
lines of format `((product_ID_1, product_ID_2), normalized_weight)`
*   *Block 2:* `basket_to_product`:
lines of format `((basket_ID, (time_stamp)), (product_ID_1, product_ID_2))`
*   *Block 3:* `hh_to_basket`:
lines of format `(HOUSEHOLD_KEY, (BASKET_ID, (time_stamp)))`








> **After:** lines of format
```
(HOUSEHOLD_KEY, 
    [(BASKET_ID_1, [all PRODUCT pairs of BASKET_ID_1 with normalized weight]), 
     (BASKET_ID_2, [all PRODUCT pairs of BASKET_ID_2 with normalized weight]),
     ... 
     (BASKET_ID_n, [all PRODUCT pairs of BASKET_ID_n with normalized weight])
    ]
)
```



### Helper functions 
#### for Step 2

In [None]:
def sort_baskets_time(line):
  key = line[0]
  return key, sorted(line[1], key = lambda x : x[0][0])

### Execution cell 
#### for Spet 2



> *Comment 1:* Baskets are sorted according to time stamp increasing



> *Comment 2:* Only households with more than 5 purchases in the selected time period are considered




In [None]:
normalized = normalized_unique_pairs.map(lambda x: (x[0], x))
pairs_join_normalized = basket_to_product.map(lambda line: (line[1], line[0])).join(normalized).map(lambda line: line[1])
pairs_join_normalized_grouped = pairs_join_normalized.groupByKey().mapValues(list)
JOIN_ALL = hh_to_basket.map(lambda line: (line[1], line[0])).join(pairs_join_normalized_grouped).map(lambda line: (line[1][0], (line[0], line[1][1])))
JOINED = JOIN_ALL.groupByKey().mapValues(list).map(sort_baskets_time).filter(lambda line: len(line[1]) > 5)

---
---

# **STEP 3**

Construction of RDD pipeline `GRAPHS_RDD` from data from STEP 2: `(HOUSEHOLD ---> list of networkx GRAPHS)`



> **Before:** lines of format
```
(HOUSEHOLD_KEY, 
    [(BASKET_ID_1, [all PRODUCT pairs of BASKET_ID_1 with normalized weight]), 
     (BASKET_ID_2, [all PRODUCT pairs of BASKET_ID_2 with normalized weight]),
     ... 
     (BASKET_ID_n, [all PRODUCT pairs of BASKET_ID_n with normalized weight])
    ]
)
```


> **After:** lines of format
```
(HOUSEHOLD, [GRAPHS of class <networkx.classes.graph.Graph at 0x7f8c3752cbd0>])
```






### Helper functions 
#### for Step 3

In [None]:
def make_prod_list(line):
  prods = []
  list_of_baskets = line[1] 
  for i in range(len(list_of_baskets)):
    list_of_prods = list_of_baskets[i][1]
    for j in range(len(list_of_prods)):
      prod1, prod2 = line[1][i][1][j][0]
      prods.append(prod1)
      prods.append(prod2)
  return line[0], prods


def make_graphs(prod_for_house):
  def make_graphs_(line):
    house_ID = line[0]
    # Every graph from one HH have the same nodes
    nodes = prod_for_house[house_ID]
    baskets_number = len(line[1])

    graphs = []
    # Construction of a graph for each basket
    for i in range(baskets_number):
      G = nx.Graph()
      G.add_nodes_from(nodes)
      basket_ID = line[1][i][0]
      products_number = len(line[1][i][1])

      for j in range(products_number):
        n1, n2 = line[1][i][1][j][0]
        W = line[1][i][1][j][1]
        G.add_edge(n1, n2, weight = W)

      graphs.append(G)

    return house_ID, graphs
  return make_graphs_

### Execution cell 
#### for Step 3

In [None]:
products_for_house = JOINED.map(make_prod_list).collect()

prod_for_house = {}
for element in products_for_house:
  prod_for_house[element[0]] = element[1]

GRAPHS_RDD = JOINED.map(make_graphs(prod_for_house))

-----
-----

# STEP 4

Collecting the generated graphs from STEP 3 and save them into the folder `Graphs`.




### Exection cell 
#### for Step 4

In [None]:
GRAPHS = GRAPHS_RDD.collect()

!mkdir Graphs
for element in GRAPHS:
  house_ID = element[0]
  list_of_graphs = element[1]
  for i, G in enumerate(list_of_graphs):
    nx.write_pajek(G, f"Graphs/{house_ID}_{i}.net")

mkdir: cannot create directory ‘Graphs’: File exists


Run the following cell to zip the graphs `Graphs_zip ` and download the zipped folder.

In [None]:
!zip -r /content/Graphs_zip.zip /content/Graphs
files.download("/content/Graphs_zip.zip")

updating: content/Graphs/ (stored 0%)
updating: content/Graphs/1096_1.net (deflated 73%)
updating: content/Graphs/1351_1.net (deflated 72%)
updating: content/Graphs/404_2.net (deflated 88%)
updating: content/Graphs/1479_7.net (deflated 80%)
updating: content/Graphs/884_10.net (deflated 72%)
updating: content/Graphs/122_6.net (deflated 77%)
updating: content/Graphs/924_0.net (deflated 85%)
updating: content/Graphs/294_1.net (deflated 75%)
updating: content/Graphs/1228_8.net (deflated 80%)
updating: content/Graphs/1828_7.net (deflated 83%)
updating: content/Graphs/575_1.net (deflated 84%)
updating: content/Graphs/1916_3.net (deflated 73%)
updating: content/Graphs/1369_2.net (deflated 75%)
updating: content/Graphs/1569_1.net (deflated 68%)
updating: content/Graphs/1828_13.net (deflated 76%)
updating: content/Graphs/2110_5.net (deflated 76%)
updating: content/Graphs/988_7.net (deflated 88%)
updating: content/Graphs/1364_1.net (deflated 72%)
updating: content/Graphs/1584_5.net (deflated 75%

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>