In [2]:
from urllib.request import urlopen
from zipfile import ZipFile
import pandas as pd
from google.cloud import bigquery
import json
import os
import schedule
import time 

In [3]:
def download_zip_into_folder(zipurl, zip_file_name):

    zipresp = urlopen(zipurl)

    tempzip = open(zip_file_name, "wb")

    tempzip.write(zipresp.read())

    tempzip.close()

def extract_zip(zip_file_name):
    zf = ZipFile(zip_file_name)

    zf.extractall()
        
    zf.close()
    
    
def read_json_files(datasets_folder):

    files = os.listdir(datasets_folder)
    df_list = []


    for f in files:
        path = datasets_folder + '/' + f
        file_reading = json.loads(open(path).read())

        df = pd.DataFrame.from_dict(file_reading['clientes'])
        df_list.append(df)

    final_df = pd.concat(df_list).reset_index(drop = True)
    return final_df
    

def update_to_bigquery():
    client = bigquery.Client.from_service_account_json("key.json")
    
    # Read all json files into a dataframe
    datasets_folder = 'Data Engineer - Challenge'
    df_dir = read_json_files(datasets_folder)

    # Read all data from Big query
    df_gbq = client.query("SELECT * FROM `keycashtest.keycash_dataset.LANDING_TABLE`").\
        result().\
        to_dataframe()

    # Finds new rows (Full Outer Join with exclusion) using list comprehension by id's
    id_list = [id_ for id_ in df_dir['id'].values if id_ not in df_gbq['id'].values]

    df_new = df_dir[df_dir['id'].isin(id_list)]

    df_final = pd.concat([df_gbq, df_new]).reset_index(drop = True)
    df_final['data_solicitacao'] = pd.to_datetime(df_final['data_solicitacao'], infer_datetime_format=True)

    # Updates to Big Query table
    tableRef = client.dataset("keycash_dataset").table("LANDING_TABLE")

    bigqueryJob = client.load_table_from_dataframe(df_final, tableRef)
    bigqueryJob.result()



In [4]:
zip_url = 'https://keycash-mkt.s3.amazonaws.com/vagas/Data-Engineer-Challenge.zip'
zip_file_name = "DE_challenge.zip"
download_zip_into_folder(zip_url, zip_file_name)
extract_zip(zip_file_name)

# Updating bigquery

In [None]:
schedule.every().day.at("15:30").do(update_to_bigquery) 
while 1: 
    schedule.run_pending()
    time.sleep(90)

In [30]:
query = "DELETE FROM `keycashtest.keycash_dataset.LANDING_TABLE` WHERE 1=1;"

output=client.query(query)