In [6]:
import pandas as pd
from zipfile36 import ZipFile
import pyarrow as pa
import pyarrow.parquet as pq
import io

In [None]:
# File paths and parameters
zip_file_path = '/workspaces/arcos_all_washpost.zip'
csv_file_name = 'arcos_all_washpost.tsv'
chunk_size = 25000
keep_cols = ['BUYER_COUNTY', 'DOSAGE_UNIT', 'CALC_BASE_WT_IN_GM', 'BUYER_STATE', 'TRANSACTION_DATE', 'DRUG_NAME']
filtered_data = []

# Suppressing the SettingWithCopyWarning
pd.options.mode.chained_assignment = None  # default='warn'

# Open the ZIP file containing the CSV file
with ZipFile(zip_file_path, 'r') as zip_file:
    # Check if the CSV file exists in the ZIP archive
    if csv_file_name in zip_file.namelist():
        # Open the CSV file from the ZIP archive
        with zip_file.open(csv_file_name) as csv_file:
            # Read the CSV file in chunks
            csv_reader = pd.read_csv(csv_file, delimiter='\t', chunksize=chunk_size, usecols=keep_cols, low_memory=False)
            
            # Iterate through chunks of the CSV file
            for i, chunk in enumerate(csv_reader):
                # Filter data for specific states
                mod_chunk = chunk.loc[chunk['BUYER_STATE'].isin(["FL", "TX", "WA"])]
                
                # Convert TRANSACTION_DATE to string type
                mod_chunk['TRANSACTION_DATE'] = mod_chunk['TRANSACTION_DATE'].astype(str)

                try:
                    # Split the date column and convert to integers
                    mod_chunk[['TransactionYear', 'TransactionMonth', 'TransactionDay']] = mod_chunk['TRANSACTION_DATE'].str.split('-', expand=True).astype(int)
                except Exception as e:
                    # Handle the error by inserting NaN values for all three date columns
                    mod_chunk[['TransactionYear', 'TransactionMonth', 'TransactionDay']] = pd.NA

                # Drop the original 'TRANSACTION_DATE' and 'TransactionDay' columns
                mod_chunk.drop(['TRANSACTION_DATE', 'TransactionDay'], axis=1, inplace=True)

                # Keep data within the specified year range
                append_chunk = mod_chunk[(mod_chunk['TransactionYear'] >= 2003) & (mod_chunk['TransactionYear'] <= 2015)]
                
                # Append the processed chunk to the list
                filtered_data.append(append_chunk)

        # Concatenate all processed chunks into a single DataFrame
        selected_data = pd.concat(filtered_data, ignore_index=True)
    else:
        # Print an error message if the file is not found in the ZIP archive
        print(f"{csv_file_name} not found in the ZIP file.")

# Specify the path where you want to save the Parquet file
parquet_file_path = 'opioid_transactions.parquet'

# Convert the Pandas DataFrame to a PyArrow Table
table = pa.Table.from_pandas(selected_data)

# Write the Table to a Parquet file
pq.write_table(table, parquet_file_path)


In [7]:
# File paths and names
zip_file_path = 'opioid_transactions.zip'
pq_file_name = 'opioid_transactions.parquet'

# Open the ZIP file containing the Parquet file
with ZipFile(zip_file_path, 'r') as zip_file:
    # Check if the Parquet file exists in the ZIP archive
    if pq_file_name in zip_file.namelist():
        # Open the Parquet file from the ZIP archive
        with zip_file.open(pq_file_name) as pq_file:
            # Read the Parquet file into a BytesIO buffer
            # This is necessary because PyArrow requires a file-like object
            pq_buffer = io.BytesIO(pq_file.read())

            # Read the Parquet table from the buffer
            # PyArrow reads the data into a Table format
            table = pq.read_table(pq_buffer)

            # Convert the PyArrow Table into a Pandas DataFrame
            # This allows for easier manipulation and analysis of the data
            df = table.to_pandas()
    else:
        # Print an error message if the Parquet file is not found in the ZIP archive
        print(f"{pq_file_name} not found in the ZIP file.")


In [None]:
# Read opioid transaction data into pandas dataframe
file_path = "opioid_transactions.parquet"
data = pd.read_parquet(file_path)

# Subset for Florida
florida = data[data['BUYER_STATE']=='FL']

# Subset for Washington
washington = data[data['BUYER_STATE']=='WA']

# Subset for Texas
texas = data[data['BUYER_STATE']=='TX']

In [None]:
# Group the Florida DataFrame by specified columns and sum specified columns
florida_grouped = florida.groupby(['BUYER_STATE', 'BUYER_COUNTY', 'DRUG_NAME', 'TransactionYear', 'TransactionMonth'])\
               .sum(['CALC_BASE_WT_IN_GM', 'DOSAGE_UNIT'])
florida_grouped = florida_grouped.reset_index()

# Save Florida grouped data to parquet file
file_name = 'florida.parquet'
florida_grouped.to_parquet(file_name)

In [None]:
# Group the Washington DataFrame by specified columns and sum specified columns
washington_grouped = washington.groupby(['BUYER_STATE', 'BUYER_COUNTY', 'DRUG_NAME', 'TransactionYear', 'TransactionMonth'])\
               .sum(['CALC_BASE_WT_IN_GM', 'DOSAGE_UNIT'])
washington_grouped = washington_grouped.reset_index()

# Save Washington grouped data to parquet file
file_name = 'washington.parquet'
washington_grouped.to_parquet(file_name)

In [None]:
# Group the Texas DataFrame by specified columns and sum specified columns
texas_grouped = texas.groupby(['BUYER_STATE', 'BUYER_COUNTY', 'DRUG_NAME', 'TransactionYear', 'TransactionMonth'])\
               .sum(['CALC_BASE_WT_IN_GM', 'DOSAGE_UNIT'])
texas_grouped = texas_grouped.reset_index()

# Save Texas grouped data to parquet file
file_name = 'texas.parquet'
texas_grouped.to_parquet(file_name)