# _Development: Version 3 of Data Ingestion Python Script_

In [36]:
from datetime import datetime
import git
import re
from pathlib import Path
import pandas as pd
import time
import subprocess
from google.cloud import storage

In [37]:
def setpath():
    return Path.home()

In [38]:
#homepath = setpath()
#homepath

In [39]:
def clone_panacea_repo(homepath):
    try:
        print('Cloning repository...')
        gitrepo = 'https://github.com/thepanacealab/covid19_twitter.git'
        git.Repo.clone_from(gitrepo, homepath / 'thepanacealab_covid19')
        print('Repo cloned.')
    except Exception as e:
        print(e)

In [40]:
#clone_panacea_repo(homepath)

In [41]:
def panacea_pull(panacearepopath):
    g = git.cmd.Git(panacearepopath)
    result = g.pull()
    return result

In [42]:
#panacearepopath = setpath() / 'thepanacealab_covid19'
#panacea_pull(panacearepopath)

In [43]:
def make_raw_folders(myrepopath, daily_list):
    # for day in list of daily folders from Panacea Labs GitHub repo
    for day in daily_list:
        if (myrepopath / 'data' / 'raw_dailies' / day).exists():
            pass
        else:
            newpath = myrepopath / 'data' / 'raw_dailies' / day
            newpath.mkdir()

In [44]:
def make_proc_folders(myrepopath, daily_list):
    # for day in list of daily folders from Panacea Labs GitHub repo
    for day in daily_list:
        if (myrepopath / 'data' / 'processed_dailies' / day).exists():
            pass
        else:
            newpath = myrepopath / 'data' / 'processed_dailies' / day
            newpath.mkdir()

In [45]:
def get_txt_data(myrepopath, panacearepopath, daily_list):
    # for day in list of daily folders from Panacea Labs GitHub Repo
    for day in daily_list:
        # create path variables to access data in Panacea repo, and path to local storage folder
        storagepath = myrepopath / 'data' / 'raw_dailies' / day
        datapath = panacearepopath / 'dailies' / day
        # get list of contents within local daily storage folder 
        files = [x.name for x in storagepath.iterdir()]
        # if txt file with that date is in daily storage folder, print confirmation
        if f'{day}_clean-dataset.txt' in files:
            pass # print(f'Txt detected in {storagepath}')
        # else read in compressed tsv file with Tweet IDs from Panacea repo & store txt file
        # with Tweet IDs in local daily storage folder
        else:
            df = pd.read_csv(f'{datapath}/{day}_clean-dataset.tsv.gz',
                             sep='\t', usecols=['tweet_id'], compression='gzip')
            df.to_csv(f'{storagepath}/{day}_clean-dataset.txt', header=None, index=None)

In [46]:
def main_setup():
    # set up path to current working directory & path to directory containing Panacea data
    homepath = setpath()
    myrepopath = Path.cwd().parent.parent
    panacearepopath = homepath / 'thepanacealab_covid19'
    if myrepopath.exists():
        pass
    else:
        myrepopath.mkdir()
    # if Panacea lab folder in working directory, print confirmation, else clone the repo
    if 'thepanacealab_covid19' in [x.name for x in homepath.iterdir()]:
        print('Panacea Labs COVID-19 GitHub has already been cloned...')
    else:
        clone_panacea_repo(path)
        
    # pull any recent updates from Panacea Lab repo
    pull_result = panacea_pull(panacearepopath)
    print(pull_result)
    # create list of daily folders located in Panacea repo (which contains data we need to access)
    file_ignore = ['README.md', '.ipynb_checkpoints']
    daily_list = [x.name for x in sorted((panacearepopath / 'dailies').iterdir())\
                  if x.name not in file_ignore]
    # check to see if data sub-directory exists in my repo
    mydatapath = myrepopath / 'data'
    if mydatapath.exists(): 
        pass
    else:
        mydatapath.mkdir()
    
    # if raw_dailies sub-folder exists make folders for raw data and get text of IDs
    if 'raw_dailies' in list(x.name for x in mydatapath.iterdir()):
        make_raw_folders(myrepopath, daily_list)
        get_txt_data(myrepopath, panacearepopath, daily_list)
    # else make raw_dailies folder, then make folders for raw data and get text of IDs
    else:
        mydailypath = mydatapath / 'raw_dailies'
        mydailypath.mkdir()
        make_raw_folders(myrepopath, daily_list)
        get_txt_data(myrepopath, panacearepopath, daily_list)
        
    # check to see if processed_dailies sub-folder exists then create daily folders    
    if 'processed_dailies' in list(x.name for x in mydatapath.iterdir()):
        make_proc_folders(myrepopath, daily_list)
    else:
        myprocdailypath = mydatapath / 'processed_dailies'
        myprocdailypath.mkdir()
        make_proc_folders(myrepopath, daily_list)

In [47]:
#main_setup()

In [48]:
def blob_exists(bucket_name, source_file_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_file_name)
    return blob.exists()

In [49]:
def storage_check(daily_list):
    bucket_name = 'thepanacealab_covid19twitter'
    nojson = []
    for day in daily_list:
        source_file_name1 = f'dailies/{day}/{day}_clean-dataset.json'
        source_file_name2 = f'dailies/{day}/panacealab_{day}_clean-dataset.json'
        json1_exist = blob_exists(bucket_name, source_file_name1)
        json2_exist = blob_exists(bucket_name, source_file_name2)
        if json1_exist or json2_exist == True:
            pass
        else:
            nojson.append(day)
    return nojson

In [50]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # bucket_name = "your-bucket-name"
    # source_file_name = "local/path/to/file"
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

In [51]:
#file_ignore = ['README.md', '.ipynb_checkpoints']
#daily_list = [x.name for x in sorted((panacearepopath / 'dailies').iterdir())\
#              if x.name not in file_ignore]

In [52]:
#def implicit():
#    from google.cloud import storage

    # If you don't specify credentials when constructing the client, the
    # client library will look for credentials in the environment.
#    storage_client = storage.Client()

    # Make an authenticated API request
#    buckets = list(storage_client.list_buckets())
#    print(buckets)

In [53]:
#implicit()

In [54]:
#nojson = storage_check(daily_list)

In [55]:
#nojson

In [56]:
#previous3 = nojson[-3:]
#print(previous3)
#testday = nojson[-1]

In [57]:
#subprocess.call(["ls", "-l"])

In [58]:
#subprocess.check_output(["ls", "-l"])

In [59]:
#myrepopath = homepath/'Documents/SharpestMinds/covid_disinfo_detect'
#myrawdatapath =  myrepopath/'data'/'raw_dailies'
#print(previous3)

In [60]:
#day = '2020-05-18'
#daypath = myrawdatapath / day

In [61]:
#twarc_command = f'twarc hydrate {daypath}/{day}_clean-dataset.txt > {daypath}/{day}_clean-dataset.json'
#print(twarc_command)

In [62]:
#subprocess.call(twarc_command, shell=True)

In [63]:
def twarc_gather(myrawdatapath, daily_list):
    #print(f'Hydrating data for the following days: {daily_list}')
    for day in daily_list:
        daypath = myrawdatapath / day
        twarc_command = f'twarc hydrate {daypath}/{day}_clean-dataset.txt > {daypath}/{day}_clean-dataset.json'
        # gzip_command = f'gzip -k {daypath}/{day}_clean-dataset.json'
        try:
            print(f'Hydrating data for {day}...')
            subprocess.call(twarc_command, shell=True)
            #print('Done gathering data via twarc, compressing JSON...')
            #subprocess.call(gzip_command, shell=True)
            #print('File compressed! Now uploading JSON file to Storage Bucket...')
            print('Uploading to bucket...')
            upload_blob(
                bucket_name='thepanacealab_covid19twitter',
                source_file_name=f'{daypath}/{day}_clean-dataset.json',
                destination_blob_name=f'dailies/{day}/{day}_clean-dataset.json'
            )
            print(f'JSON file uploaded to Storage Bucket, now removing JSON from {day} folder...')
            filepath = daypath / f'{day}_clean-dataset.json'
            # remove JSON file
            filepath.unlink()
            print(f'JSON removed from {day} folder!')
            # clean data --> not for use locally
            # clean_data_wrapper(daypath, myprocdatapath, day)
        except Exception as e:
            print(e)

In [64]:
#twarc_gather(myrawdatapath, previous3[::-1])

In [65]:
#%%time
#twarc_gather(myrawdatapath, previous3[::-1])

In [66]:
#%%time
#twarc_gather(myrawdatapath, previous3[::-1])

In [67]:
#homepath = setpath()
#myrepopath = Path.cwd().parent.parent
#print(homepath)
#print(myrepopath)

In [68]:
def main_gather():
    # set up path to current working directory & path to directory containing Panacea data
    homepath = setpath()
    myrepopath = Path.cwd().parent.parent
    panacearepopath = homepath / 'thepanacealab_covid19'
    myrawdatapath =  myrepopath / 'data' / 'raw_dailies'
    #myprocdatapath = myrepopath / 'data' / 'processed_dailies' --> don't belive I need at the moment
    # create list of daily folders located in Panacea repo (which contains data we need to access)
    file_ignore = ['README.md', '.ipynb_checkpoints']
    daily_list = [x.name for x in sorted((panacearepopath / 'dailies').iterdir())\
                  if x.name not in file_ignore]
    # see what daily data we do not have in storage bucket
    nojson = storage_check(daily_list)
    #previous4 = nojson[-4:]
    print(f'\nTotal of {len(nojson)} folders do not contain a JSON file:\n{nojson}\n')
    print(f'Gathering data for the previous days without JSONs:\n{nojson[::-1]}')
    twarc_gather(myrawdatapath, nojson[::-1])

In [69]:
def main_program():
    main_setup()
    main_gather()

In [70]:
%%time
print(f'Started at: {datetime.now()}')
main_program()
print(f'Ended at: {datetime.now()}')

Started at: 2020-05-25 17:03:29.613767
Panacea Labs COVID-19 GitHub has already been cloned...
Already up to date.

Total of 2 folders do not contain a JSON file:
['2020-03-22', '2020-03-28']

Gathering data for the previous days without JSONs:
['2020-03-28', '2020-03-22']
Hydrating data for 2020-03-28...
Uploading to bucket...
File /Users/jairesearch/Documents/SharpestMinds/covid_disinfo_detect/data/raw_dailies/2020-03-28/2020-03-28_clean-dataset.json uploaded to dailies/2020-03-28/2020-03-28_clean-dataset.json.
JSON file uploaded to Storage Bucket, now removing JSON from 2020-03-28 folder...
JSON removed from 2020-03-28 folder!
Hydrating data for 2020-03-22...
Uploading to bucket...
File /Users/jairesearch/Documents/SharpestMinds/covid_disinfo_detect/data/raw_dailies/2020-03-22/2020-03-22_clean-dataset.json uploaded to dailies/2020-03-22/2020-03-22_clean-dataset.json.
JSON file uploaded to Storage Bucket, now removing JSON from 2020-03-22 folder...
JSON removed from 2020-03-22 folder