# Data preparation notebook using BigQuery in GCP 

#### NB:
there are two parts in this notebook, the first one create tables in bigquery database to use them later in processing stages on Google cloud platforme, the second part creates mtx files for each dataset ( the adjacency matrix of each network ) to use eather on GCP or in local machine

In [5]:
!scala -version

Scala code runner version 2.12.10 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, Inc.


In [19]:
from scipy.sparse import csr_matrix
import numpy as np 
from google.cloud import storage
from google.cloud import bigquery
from pyspark.sql import *
import pandas as pd
from scipy.io import mmwrite

spark = SparkSession.builder \
  .appName('data-preparation')\
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
  .getOrCreate()

### Part one: BigQuery tables creation 

In [12]:
files = ['CA-AstroPh', 'CA-GrQc',  'CA-HepPh', 'CA-HepTh' ]
# the bucket where we store the edges of each dataset in cloud storage 
BUCKET_NAME = 'rplace-bucket'
# project name in GCP 
project_name = "bigdata-project-346922"
dataset = "collaboration_data"

client = storage.Client()
bucket = client.get_bucket(BUCKET_NAME)
# try to bring data to current repository from cloud storage
for file in files :
    blob = bucket.get_blob('data/' + file + '.txt')
    blob.download_to_filename(file + '.txt')
    

In [14]:
edge = Row('fromId', 'toId')
client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("fromId", bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("toId", bigquery.enums.SqlTypeNames.INTEGER),
    ],
    write_disposition="WRITE_TRUNCATE",
)

In [17]:
# iterating over all files to create foreach dataset an edges table 
for file in files : 
    edges = []
    indexTable = []
    index = 1
    with open(file + '.txt', 'r') as f:
        print("Reading the {0} dataset file".format(file))
        content = f.readlines()
        for line in content[4:]:
            i = int(line.rsplit()[0])
            j = int(line.rsplit()[1])
            edges.append(edge(i,j))
            indexTable.append(index)
            index += 1 
    table_id = project_name + '.' + dataset + '.' + file
    df = pandas.DataFrame(
            edges,
            columns=[
                "fromId",
                "toId"
                ],
            index=pandas.Index(
            indexTable, name="id"
            ),
        )
    job= client.load_table_from_dataframe(
        df, table_id, job_config=job_config
    ) 

Reading the CA-AstroPh dataset file
Reading the CA-GrQc dataset file
Reading the CA-HepPh dataset file
Reading the CA-HepTh dataset file


#### verification step :
foreach created table we do request to bigquery API to retrieve tables ! 

In [18]:
# we assert that tables are created and data is loaded !
for file in files: 
    table_id = project_name + '.' + dataset + '.' + file
    table = client.get_table(table_id) 
    print(
            "Loaded {} rows and {} columns to {}".format(
            table.num_rows, len(table.schema), table_id
            )
        )

Loaded 396160 rows and 3 columns to bigdata-project-346922.collaboration_data.CA-AstroPh
Loaded 28980 rows and 3 columns to bigdata-project-346922.collaboration_data.CA-GrQc
Loaded 237010 rows and 3 columns to bigdata-project-346922.collaboration_data.CA-HepPh
Loaded 51971 rows and 3 columns to bigdata-project-346922.collaboration_data.CA-HepTh


### Part Two: creating sparse matrix for use without bigquery 
#### creation step :
foreach file, we create an adjacency matrix in sparse format ( for storage constraints ! ) 

In [20]:
for file in files:
    data = []
    cols = []
    rows = []
    node = 0
    print("Creating sparse matrix for {0} dataset".format(file))
    # reading files content in list of lines 
    with open(file + '.txt') as f:
        content = f.readlines()
    # lines 0 - 3 are not included in edges, extra informations ...
    for line in content[4:]:
        i = int(line.rsplit()[0])
        j = int(line.rsplit()[1])
        cols.append(j-1)
        rows.append(i-1)
        data.append(1)
    # identifiersList are index list for each node value
    # this step is for reducing the size of adj matrix 
    # 
    identifiersList = [i for i in set(cols + rows)]
    identifiersList.sort()
    dic = dict()
    node = 0
    for i in identifiersList:
        dic[i] = node
        node+=1
    for k in range(len(cols)) : 
        cols[k] = dic[cols[k]]
        rows[k] = dic[rows[k]]
    bgId = max(set(cols + rows))
    m = csr_matrix((data, (rows, cols)), shape=(bgId+1, bgId+1), dtype=np.uint16)
    mmwrite(file + '.mtx',m)

#### verification step : 
we show created files in last step

In [24]:
# verification of the cration of files 
! ls ./*.mtx

./CA-AstroPh.mtx  ./CA-GrQc.mtx  ./CA-HepPh.mtx  ./CA-HepTh.mtx  ./email.mtx


#### optional step 
Here, we are trying to save .mtx files of each dataset to our working bucket in cloud storage 

In [28]:
!gsutil -m cp ./*.mtx gs://rplace-bucket/data

Copying file://./CA-AstroPh.mtx [Content-Type=application/octet-stream]...
Copying file://./CA-GrQc.mtx [Content-Type=application/octet-stream]...         
Copying file://./CA-HepPh.mtx [Content-Type=application/octet-stream]...        
Copying file://./CA-HepTh.mtx [Content-Type=application/octet-stream]...        
Copying file://./email.mtx [Content-Type=application/octet-stream]...           
/ [5/5 files][  4.3 MiB/  4.3 MiB] 100% Done                                    
Operation completed over 5 objects/4.3 MiB.                                      
