In [1]:
import os
import pandas as pd
import zipfile
import csv
from io import StringIO
import mysql.connector
from google.cloud import bigquery
import pandas_gbq

### Testing on home server

In [None]:
try:
    connection = mysql.connector.connect(
        host = os.getenv('sql_host'),
        user = os.getenv('sql_user'),
        password = os.getenv('sql_pw'),
        database = 'wedge_assignment')

    cursor = connection.cursor()
except mysql.connector.Error as err:
    print(f'{err}')

finally:
    if cursor:
        print('Cursor created')

#### Setting Directory for Mac and Windows 

In [3]:
try:
    directory_small = '/Users/aaronportra/Documents/ADA/wedge_project/WedgeZipOfZips_small/'
    directory = '/Users/aaronportra/Documents/ADA/wedge_project/WedgeZipOfZips/'
    os.listdir(directory)
except FileNotFoundError:
    directory_small = 'C:\\Users\\aport\\OneDrive\\Documents\\School\\Fall Semester 2024\\Applied Data Analytics\\wedge_project\\WedgeZipOfZips_small\\'
    directory = 'C:\\Users\\aport\\OneDrive\\Documents\\School\\Fall Semester 2024\\Applied Data Analytics\\wedge_project\\WedgeZipOfZips\\'
    os.listdir(directory)

## Cleaning Data for BQ

In [4]:
def sniff(file):

    file = StringIO(file)

    sample = file.read(10000)

    sniffer = csv.Sniffer()

    file.seek(0)

    delimiter = sniffer.sniff(sample).delimiter


    reader = csv.reader(file, delimiter=delimiter)

    first_row = next(reader)

    has_header = sniffer.has_header(sample)


    if len(first_row) < 50:
        has_header = False

    return delimiter, has_header

In [5]:
# #Adding space to text. Example: ChickenBreast -> Chicken Breast
# def add_space(text):
#     if isinstance(text,str):
#         return re.sub(r'([a-z])([A-Z])',r'\1 \2',text)

#     return text


In [6]:
#Read in files and load to pandas

def create_db(directory):
    transactions = {}

    for file in os.listdir(directory):
        if file.endswith('.zip'):
            with zipfile.ZipFile(os.path.join(directory, file),'r') as zip_file:
                for info in zip_file.namelist():
        
                    file_content = zip_file.read(info).decode('utf-8',errors = 'replace')

                    csv_file = StringIO(file_content)

                    h = sniff(file_content)

                    csv_file.seek(0)

                    if h[1]:
                        data = pd.read_csv(csv_file, delimiter=h[0])
                        data = pd.DataFrame(data)

                        columns = data.columns


                    elif not h[1]:
                        data = pd.read_csv(csv_file, delimiter = h[0], header = None,)
                        data = pd.DataFrame(data)
                        data.columns = columns
    
                    # del_nan(data)

                    data = data.convert_dtypes()

                transactions[file.rstrip('.zip')] = data
    

    for trans in transactions: 

        transactions[trans]['datetime'] = pd.to_datetime(transactions[trans]['datetime'])
        transactions[trans]['description'] = transactions[trans]['description'].str.replace('"','')

        for col in transactions[trans]:
            if transactions[trans][col].dtypes == 'Int64':
                transactions[trans][col] = pd.to_numeric(transactions[trans][col],errors = 'coerce').astype('float64')
            elif transactions[trans][col].dtypes == 'object':
                transactions[trans][col] = pd.to_numeric(transactions[trans][col],errors = 'coerce').astype('float64')

            



    
    return transactions

In [None]:
# small_zips = create_db(directory_small)

full_zips = create_db(directory)

In [8]:
#Converting columns to bools

bools = ['memType','staff','batchHeaderID','display']

for tables in full_zips:
    for col in bools:
        if col in full_zips[tables] and full_zips[tables][col].dtypes != bool:
            full_zips[tables][col] = pd.to_numeric(full_zips[tables][col], errors='coerce')

            full_zips[tables][col] = full_zips[tables][col].fillna(0)

            full_zips[tables][col] = (full_zips[tables][col] != 0).astype(bool)


In [9]:
#Saving to CSV

# try:
#     save_path = '/Users/aaronportra/Documents/ADA/wedge_project/clean_transactions/'
#     os.listdir(save_path)
# except FileNotFoundError:
#     save_path = 'C:\\Users\\aport\\OneDrive\\Documents\\School\\Fall Semester 2024\\Applied Data Analytics\\wedge_project\\clean_transactions\\'
#     os.listdir(save_path)

# for trans in full_zips:
#     full_zips[trans].to_csv(save_path + trans + '.csv', encoding = 'utf-8', index = False)


In [10]:
full_zips['transArchive_201201_201203_inactive']['organic'] = pd.to_numeric(full_zips['transArchive_201201_201203_inactive']['organic']).astype('float')

In [11]:
project_id = 'wedge-project-0'

dataset_id = 'transactions'

client = bigquery.Client(project = project_id)

dataset_ref = client.dataset(dataset_id)
try:
    client.get_dataset(dataset_ref) 
except:
    client.create_dataset(dataset_ref) 


In [None]:
for trans in full_zips:
    try: 
        client.get_table(f'{project_id}.{dataset_id}.{trans}')
        print(f'{trans} already exists')

        continue

    except:
        pandas_gbq.to_gbq(full_zips[trans],project_id = project_id,destination_table = f'{project_id}.{dataset_id}.{trans}')

In [None]:
if connection.is_connected():
    connection.close()
    print('Connection Closed')


In [None]:
for trans in full_zips:
    print(trans)

In [None]:
for i in full_zips['transArchive_201201_201203_inactive'].columns:
    print(i)
    print(full_zips['transArchive_201201_201203_inactive'][i].dtypes)