In [2]:
import pandas_gbq
import os
import pandas as pd
from google.oauth2 import service_account
from google.cloud import bigquery
from google.cloud import storage
import numpy as np
import sqlite3



In [3]:

# Paths
file_path = r"C:\Users\britt\OneDrive - The University of Montana\Applied Data Analytics\Wedge Project\wedge-project\Uploaded"

# Read CSVs in chunks and upload to BigQuery
credentials_path = r"C:\Users\britt\OneDrive - The University of Montana\Applied Data Analytics\Wedge Project\wedge-project\wedge-project-bt-bf0ddf1029cd.json"
credentials = service_account.Credentials.from_service_account_file(credentials_path)
gbq_proj_id = "wedge-project-bt"
dataset_id = "wedge_data"

## Task 1

In [None]:

# Headers for the files
headers = [
    'datetime', 'register_no', 'emp_no', 'trans_no', 'upc', 'description', 'trans_type', 'trans_subtype', 'trans_status',
    'department', 'quantity', 'Scale', 'cost', 'unitPrice', 'total', 'regPrice', 'altPrice', 'tax', 'taxexempt', 'foodstamp',
    'wicable', 'discount', 'memDiscount', 'discountable', 'discounttype', 'voided', 'percentDiscount', 'ItemQtty', 'volDiscType',
    'volume', 'VolSpecial', 'mixMatch', 'matched', 'memType', 'staff', 'numflag', 'itemstatus', 'tenderstatus', 'charflag',
    'varflag', 'batchHeaderID', 'local', 'organic', 'display', 'receipt', 'card_no', 'store', 'branch', 'match_id', 'trans_id'
]

# Loop through all files in the directory
for root, dirs, files in os.walk(file_path):
    for file in files: 
        full_path = os.path.join(root, file) 
        if file.endswith('.csv'): 
            with open(full_path, 'r') as f: 
                first_line = f.readline().strip() 

            # Check if the file likely has headers based on the first line
            if first_line.startswith('datetime'):
                print(f"File {file} seems to already have headers. Skipping...")
                continue

            # If not, then prepend headers to the file
            print(f"Adding headers to {file}")
            with open(full_path, 'r') as f:
                content = f.read()
            with open(full_path, 'w') as f:
                f.write(','.join(headers) + '\n' + content)


In [None]:

chunk_size = 50000  

# Print directory contents for debugging
print(f"Contents of {file_path}:")
print(os.listdir(file_path))

def detect_delimiter(filename):
    with open(filename, 'r') as file:
        first_line = file.readline()
        if ";" in first_line:
            return ";"
        else:
            return ","

def clean_dataframe(df):
    # Replace "NULL", "\\N", "\\\\N", and blanks with np.NaN
    df.replace(["NULL", "\\N", "\\\\N", ""], np.NaN, inplace=True)
    
    # Trim spaces from string columns
    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].str.strip()
    
    # Convert empty strings to np.NaN
    df = df.replace("", np.NaN)

    # Type Conversion
    float_columns = [
        'register_no', 'emp_no', 'trans_no', 'department', 'quantity', 'Scale', 'cost', 'unitPrice', 'total', 'regPrice',
        'altPrice', 'tax', 'taxexempt', 'foodstamp', 'wicable', 'discount', 'memDiscount', 'discountable', 'discounttype',
        'voided', 'percentDiscount', 'ItemQtty', 'volDiscType', 'volume', 'VolSpecial', 'mixMatch', 'matched', 'numflag',
        'itemstatus', 'tenderstatus', 'varflag', 'local', 'organic', 'receipt', 'card_no', 'store', 'branch', 'match_id', 'trans_id'
    ]
    boolean_columns = ['memType', 'staff', 'batchHeaderID', 'display']
    string_columns = ['upc', 'description', 'trans_type', 'trans_subtype', 'trans_status', 'charflag']

    for col in float_columns:
        if col in df.columns:
            df[col] = df[col].astype(float)

    for col in boolean_columns:
        if col in df.columns:
            df[col] = df[col].astype(bool)

    for col in string_columns:
        if col in df.columns:
            df[col] = df[col].astype(str)

    df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce')

    return df


# Loop through all files in the directory
for root, dirs, files in os.walk(file_path):
    for file in files:
        full_path = os.path.join(root, file)
        
        if file.endswith('.csv'):
            print(f"Found CSV file: {file}")
            
            delimiter = detect_delimiter(full_path)
            print(f"Detected delimiter: {delimiter}")
            
            print(f"Reading CSV file in chunks: {file}...")
            chunk_iter = pd.read_csv(full_path, delimiter=delimiter, chunksize=chunk_size, dtype=str, low_memory=False)
            
            table_name = file.replace('.csv', '')  # Name the table after the CSV file
            table_id = f"{gbq_proj_id}.{dataset_id}.{table_name}"
            
            for idx, chunk_df in enumerate(chunk_iter):
                # Clean the dataframe
                chunk_df = clean_dataframe(chunk_df)
                
                # Modify the field names to comply with BigQuery rules
                chunk_df.columns = [col.lower().replace(';', '') for col in chunk_df.columns]
                
                print(f"Uploading chunk {idx + 1} to {table_name}...")
                if idx == 0:
                    # For the first chunk, create the table
                    pandas_gbq.to_gbq(chunk_df, table_id, project_id=gbq_proj_id, if_exists='replace', credentials=credentials)
                else:
                    # For subsequent chunks, append to the table
                    pandas_gbq.to_gbq(chunk_df, table_id, project_id=gbq_proj_id, if_exists='append', credentials=credentials)
                del chunk_df  # Clear the chunk from memory

print("Upload complete.")


## TASK 2

In [None]:
# Path to service account JSON key file
credentials_path = r"C:\Users\britt\OneDrive - The University of Montana\Applied Data Analytics\Wedge Project\wedge-project\wedge-project-bt-bf0ddf1029cd.json"
credentials = service_account.Credentials.from_service_account_file(credentials_path)
client = bigquery.Client(credentials=credentials, project= gbq_proj_id)

# SQL query
query = """
WITH rand_cte AS(
SELECT DISTINCT card_no
  FROM `wedge-project-bt.transArchive_*` 
  WHERE card_no != 3
  ORDER BY RAND()
  LIMIT 601)

  SELECT *
  FROM `wedge-project-bt.transArchive_*` AS trans
  JOIN rand_cte 
  ON rand_cte.card_no = trans.card_no
"""

# Run the query and get the result as a dataframe
df = client.query(query).to_dataframe(create_bqstorage_client=False)

# Save the dataframe to a TXT file
df.to_csv("output_data1.txt", index=False)


## TASK 3

In [4]:


query = """SELECT
  EXTRACT(DATE
  FROM
    datetime) AS date,
    EXTRACT(HOUR FROM datetime) AS hour,
  SUM(total) AS spend,
  COUNT(DISTINCT CONCAT(EXTRACT(DATE
        FROM
          datetime), 
          register_no, emp_no, trans_no)) AS trans,
  SUM(CASE
      WHEN trans_status IN ('V', 'R') THEN -1
    ELSE
    1
  END
    ) AS items
FROM
  `wedge-project-bt.transArchive*`
WHERE
  department NOT IN (0,
    15)
  AND (trans_status IS  NULL
  OR trans_status IN (' ','V','R'))
GROUP BY
  date, hour
  ORDER BY 
  date, hour;
  """


In [5]:
# Path to service account JSON key file
conn = sqlite3.connect('wedge-reporting.db')


In [7]:
holder = pandas_gbq.read_gbq(query, project_id=gbq_proj_id, credentials=credentials)    

GenericGBQException: Reason: 403 Access Denied: Table umt-msba:transactions.transArchive*: User does not have permission to query table umt-msba:transactions.transArchive*, or perhaps it does not exist in location US.

Location: US
Job ID: 6eb2d261-9e58-4590-a11a-d85266a046da


In [10]:
holder.head()

NameError: name 'holder' is not defined

In [None]:
holder.to.sql('date-hour.db',conn, if_exists='replace', index=False)

In [None]:
# Close the connection at end of all queries

conn.close()