I used the following code to create my cluster. I was not able to use n1-standard-4 because of CPU quota constraints, which may have affected the performance.

```
$CLUSTER=bgse-ds-hw03-cluster
$PROJECT=bgsedatasciencehw
$BUCKET=bgse_datascience_hw_bucket

gcloud beta dataproc clusters create $CLUSTER \
--optional-components=ANACONDA,JUPYTER \
--image-version=1.3 \
--enable-component-gateway \
--bucket $BUCKET \
--project $PROJECT \
--zone europe-west1-b \
--region europe-west1 \
--worker-machine-type n1-standard-2 \
--num-workers=2
```

In [41]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import scipy
from itertools import permutations

from scipy.sparse import coo_matrix
from google.cloud import storage

import json 
import datetime as dt


In [12]:
def create_file_list(bucket_name = "bgse-datawarehousing-random-tweets"):
    start = dt.datetime.now()
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    json_file_list = []
    for idx, blob in enumerate(bucket.list_blobs(prefix="")):
        json_file_list.append("gs://" + bucket_name + "/" + blob.name)
    end = dt.datetime.now()
    print("Create file list : [ {0} ] files found :: Elapsed time :: [ {1} ]".format(idx+1, end - start))
    return json_file_list


In [13]:
def load_json_files(passed_list, stop=0):
    start = dt.datetime.now()
    if stop == 0:
        stop = len(passed_list)
    data = spark.read.json(passed_list[0:stop])
    end = dt.datetime.now()
    print("Load file list : [ {0} ] files :: Elapsed time :: [ {1} ]".format(stop, end - start))
    return data

In [14]:
def create_value_list(data, backup=False):
    start = dt.datetime.now()

    # Taking only lists with more than one element
    value_list = data.rdd \
        .filter(lambda l: ((l.entities is not None) and \
                                (len(l.entities.hashtags) > 1) )) \
        .map(lambda l: [i.text.lower() for i in l.entities.hashtags]) \
        .flatMap(lambda l: permutations(l, 2)) \
        .map(lambda l: (l, 1)) \
        .reduceByKey(lambda a, b: a + b) \
        .map(lambda l: (l[0][0], l[0][1], l[1])) \
        .collect()

    end = dt.datetime.now()
    
    if backup:
        with open('value_list.json', 'w') as outfile:
            json.dump(value_list, outfile)
            
    print("Create value list :: Elapsed time :: [ {0} ]".format(end - start))
    return value_list

In [15]:
def create_column_list(value_list, backup=False):
    start = dt.datetime.now()

    unique = list(set([x[0] for x in value_list]))
    sort_list = sorted(unique)
    dict_column_names = { sort_list[i] : i for i in range(0, len(sort_list) ) }

    if backup:
        with open('column_names.json', 'w') as outfile:
            json.dump(dict_column_names, outfile)
            
    end = dt.datetime.now()   
    print("Create column list :: Elapsed time :: [ {0} ]".format(end - start))
    return dict_column_names

In [16]:
def create_sparse_matrix(dict_column_names, value_list):
    start = dt.datetime.now()
    
    row = np.array([dict_column_names[i[0]] for i in value_list])
    column = np.array([dict_column_names[i[1]] for i in value_list])
    data = np.array([i[2] for i in value_list])
    sp_mat = coo_matrix((data, (row,column)),
                        shape=(len(dict_column_names),
                               len(dict_column_names)))
    
    end = dt.datetime.now()   
    print("Create sparse matrix :: Elapsed time :: [ {0} ]".format(end - start))
    return sp_mat

In [17]:
def reload_from_backup(col_names = 'column_names.json', val_list = 'value_list.json'):
    start = dt.datetime.now()
    
    with open(col_names, 'r') as infile:
        dict_column_names = json.load(infile)
    with open(val_list, 'r') as infile:
        value_list = json.load(infile)
    
    end = dt.datetime.now()   
    print("Reload files :: Elapsed time :: [ {0} ]".format(end - start))
    return value_list, dict_column_names

In [18]:
def runScript(backup = True, files = 0, singleFile = False):
    initial_start = dt.datetime.now()
    #initialise_logger()
    if singleFile:
        json_file_list = ["gs://bgse-datawarehousing-random-tweets/2019-02-26T00:00:30.657Z"]
        files = 1
    else:
        json_file_list = create_file_list()

    if (files == 0):
        files = len(json_file_list)
        print("Files unset; running on entire set.")
    data = load_json_files(json_file_list, stop = files)
    
    #data.printSchema()
    
    value_list = create_value_list(data, backup)
    dict_column_names = create_column_list(value_list, backup)

    if backup:
        value_list, dict_column_names = reload_from_backup()
    create_sparse_matrix(dict_column_names, value_list)

    print("Overall Script :: Elapsed time :: [ {0} ]".format(dt.datetime.now() - initial_start))


In [19]:
def load_from_files():
    value_list, dict_column_names = reload_from_backup()
    sp_mat = create_sparse_matrix(dict_column_names, value_list)
    return sp_mat

This runs the script but for the full dataset it stops producing output in the middle. As a result, I backed up the values to .json files, which allow the sparse matrix to be rebuilt. I think that running it as a script might improve the process.

In [None]:
print("Start time :: " + str(dt.datetime.now()))
runScript()


Start time :: 2019-10-28 13:39:45.690149
Create file list : [ 6108 ] files found :: Elapsed time :: [ 0:00:00.688136 ]
Files unset; running on entire set.
Load file list : [ 6109 ] files :: Elapsed time :: [ 0:27:37.927885 ]


In [39]:
sp_mat = load_from_files()
sp_mat

Reload files :: Elapsed time :: [ 0:00:05.221110 ]
Create sparse matrix :: Elapsed time :: [ 0:00:04.153591 ]


<260019x260019 sparse matrix of type '<type 'numpy.int64'>'
	with 2561237 stored elements in COOrdinate format>

In [42]:
scipy.sparse.save_npz("sparseMatrix.npz", sp_mat, compressed=True)

I then created a terminal through Jupyter and then ran the following:
```
cd /
gsutil cp sparseMatrix.npz gs://bgse_datascience_hw_bucket
```

Then, on my local machine, go to your GitHub folder and run the following
```
cd $GITHUB/DataScience/Trimester01/DataWarehousingAndBusinessIntelligence/warehousing-spark-tutorial/homework
gsutil cp gs://bgse_datascience_hw_bucket/sparseMatrix.npz .
gsutil cp gs://bgse_datascience_hw_bucket/notebooks/jupyter .
```