In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/EPFL HS21/Applied data analysis CS-401/Project/ADA team winner/Project Milestone 2

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/EPFL HS21/Applied data analysis CS-401/Project/ADA team winner/Project Milestone 2


In [None]:
import json
import pandas as pd

# Loading the wikidata in Google BigQuery (BQ)
The goal of this file is to load the wikidata dataset from Google drive into BQ.

There are 2 types of datasets:
1. speaker attributes
2. wikidata

The initial files to be loaded in BQ are in Google Drive. This python script opens by chunk the large large files (so that they fit in memory) and then loads uploads them in BQ after some slight pre-processing.

In [None]:
# Setup of the Google Cloud utilities
from google.cloud import bigquery
from google.oauth2 import service_account
import json, os

# Google Cloud services
gcp_service_account_credentials_json_filename = 'epfl-course-f41b0ed796f9.json' #need to upload the json credential files to the root directory of the google colab files
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_service_account_credentials_json_filename
credentials = service_account.Credentials.from_service_account_file(gcp_service_account_credentials_json_filename, scopes=['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/drive'])
project_id = 'epfl-course'
bigquery_client = bigquery.Client(credentials=credentials, project=project_id)
bigquery_client = bigquery.Client()

#Execute a query in BQ
def bq_execute_query(query, mode="INTERACTIVE", wait=False, to_dataframe=False):
    job_config = bigquery.QueryJobConfig(priority="bigquery.QueryPriority.{}".format(mode)) # Run at BATCH priority, which won't count toward concurrent rate limit, otherwise INTERACTIVE.
    query_job = bigquery_client.query(query, job_config)
    if wait==True:
        print("Executed BQ query: ", query_job.result())
    if to_dataframe==True:
        return(query_job.to_dataframe())
    else:
        return(query_job)

#Upload a DF to BQ
def upload_df_to_bq(df, bq_destination_table, write_disposition="WRITE_APPEND"):
    #bq_table_name = "epfl-course.dataset.table"
    job_config = bigquery.LoadJobConfig(create_disposition="CREATE_IF_NEEDED", 
                                        write_disposition=write_disposition)
    #source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    upload_df_to_bq_job = bigquery_client.load_table_from_dataframe(
        df, bq_destination_table, job_config = job_config)
    print("Uploaded DF to BQ: ",upload_df_to_bq_job.result()) 

def upload_parquet_to_bq(parquet_file, bq_destination_table, write_disposition="WRITE_APPEND"):
    job_config = bigquery.LoadJobConfig(create_disposition="CREATE_IF_NEEDED", 
                                        write_disposition=write_disposition,
                                        schema=[
                                            bigquery.SchemaField("aliases", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                            bigquery.SchemaField("date_of_birth", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                            bigquery.SchemaField("nationality", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                            bigquery.SchemaField("gender", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                            bigquery.SchemaField("ethnic_group", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                            bigquery.SchemaField("occupation", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                            bigquery.SchemaField("party", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                            bigquery.SchemaField("academic_degree", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                            bigquery.SchemaField("candidacy", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                            bigquery.SchemaField("religion", "STRING", mode="REPEATED"), #, fields=[bigquery.SchemaField('list', 'RECORD', mode="REPEATED", fields=[bigquery.SchemaField('element', 'STRING')])]),
                                        ]
                                        )
    source_format=bigquery.SourceFormat.PARQUET
    upload_file_to_bq_job = bigquery_client.load_table_from_file(
          parquet_file, 
          bq_destination_table, 
          job_config = job_config)
    print("Uploaded Parquet to BQ: ",upload_file_to_bq_job.result()) 

#Upload a JSON to BQ
def upload_json_to_bq(json_object, bq_table):
    try:
        job_config = bigquery.LoadJobConfig()
        job_config.autodetect = False #Change to True if the table on BQ does not exits
        job_config.max_bad_records = 0
        job_config.ignore_unknown_values = True
        job_config.source_format = 'NEWLINE_DELIMITED_JSON'
        job_config.create_disposition= "CREATE_IF_NEEDED"
        job_config.write_disposition= "WRITE_APPEND"
        job = bigquery_client.load_table_from_file(json_object, bq_table, job_config = job_config)
        print("Loaded JSON to BQ table {} as job {}".format(bq_table, job.result()))
        assert job.job_type == 'load'
        assert job.state == 'DONE'
    except:
        print("ERROR Could not load JSON to BQ table {} as job {}".format(bq_table, job.result()))


## Load Speakers Attributes to BQ

In [None]:
parquet_folder_location = "Project datasets/speaker_attributes.parquet"
#get a list of files to be uploaded
list_of_paquet_files_to_upload = [f for f in os.listdir(parquet_folder_location) if os.path.isfile(os.path.join(parquet_folder_location, f)) and "SUCCESS" not in f and ".crc" not in f]
list_of_paquet_files_to_upload.sort()

In [None]:
list_of_paquet_files_to_upload

['part-00000-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00001-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00002-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00003-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00004-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00005-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00006-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00007-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00008-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00009-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00010-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00011-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00012-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet',
 'part-00013-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.pa

In [None]:
# For every file to be uploaded, open the parquet file, convert it as dataframe (df) and upload it to BQ.
counter = 0
for file_name in list_of_paquet_files_to_upload:
  file_path = os.path.join(parquet_folder_location, file_name)
  print("Uploading ", file_name)

  #read file
  df = pd.read_parquet(file_path)

  #upload file to BQ
  bq_destination_table_suffix = str(counter).zfill(3)
  bq_destination_table = "epfl-course.wikidata_parquet.speaker_attributes" + bq_destination_table_suffix
  print("New table name: ", bq_destination_table)
  upload_df_to_bq(df, bq_destination_table)
  counter += 1
  pass


Uploading  part-00000-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet




Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fe309ca1110>
Uploading  part-00001-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fe2f9342cd0>
Uploading  part-00002-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fe2defebcd0>
Uploading  part-00003-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fe310659e90>
Uploading  part-00004-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fe30a029f90>
Uploading  part-00005-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fe30a0d5dd0>
Uploading  part-00006-0d587965-3d8f-41ce-9771-5b8c9024dce9-c000.snappy.parquet
Uploaded DF to BQ:  <google.cloud.bigque

In [None]:
#schema of the files uploaded to BQ
df.dtypes

aliases               object
date_of_birth         object
nationality           object
gender                object
lastrevid              int64
ethnic_group          object
US_congress_bio_ID    object
occupation            object
party                 object
academic_degree       object
id                    object
label                 object
candidacy             object
type                  object
religion              object
dtype: object

## Load wikidata to BQ

In [None]:
#Load the wikidata to BQ by chunks of 100000 rows
wikidata_labels_descriptions_df = pd.read_csv("Project datasets/wikidata_labels_descriptions.csv.bz2", 
                                              compression='bz2',
                                              chunksize= 100000) #read by chunk to fit in memory

In [None]:
bq_destination_table_labels = "epfl-course.wikidata_parquet.wikidata_labels_descriptions"
counter = 0
for chunk in wikidata_labels_descriptions_df:
  print("Uploading chunk ", counter)
  upload_df_to_bq(chunk, bq_destination_table_labels)
  counter += 1

Uploading chunk  0




Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb18c927cd0>
Uploading chunk  1
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb1883d19d0>
Uploading chunk  2
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb188435810>
Uploading chunk  3
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb18d6b8950>
Uploading chunk  4
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb188a43d50>
Uploading chunk  5
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb18614fed0>
Uploading chunk  6
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb18524e250>
Uploading chunk  7
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb1842c11d0>
Uploading chunk  8
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb1843036d0>
Uploading chunk  9
Uploaded DF to BQ:  <google.cloud.bigquery.job.LoadJob object at 0x7fb18e365bd0>
Uploading chunk  10

In [None]:
#Wikidata table schema
print(chunk.dtypes)

QID            object
Label          object
Description    object
dtype: object
