In [26]:
import pandas as pd
import pyarrow.parquet as pq
from google.cloud import storage
import glob
import os
import gcsfs

In [27]:
# GCP Authentication Setup
def setup_gcp_auth(credentials_path=None):
    """
    Set up GCP authentication
    
    Args:
        credentials_path: Optional path to service account JSON key file
    """
    if credentials_path:
        # Explicitly use service account credentials by specifying the key file
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
        print(f"Using credentials from: {credentials_path}")
    else:
        # Check if credentials are already set
        if 'GOOGLE_APPLICATION_CREDENTIALS' in os.environ:
            print(f"Using credentials from environment variable")
        else:
            print("No credentials specified. Using default authentication")
            # This will use Application Default Credentials (ADC) if available
            # https://cloud.google.com/docs/authentication/application-default-credentials
    
    # Test authentication by listing buckets
    try:
        storage_client = storage.Client()
        buckets = list(storage_client.list_buckets(max_results=5))
        print(f"Authentication successful. Found {len(buckets)} buckets.")
        return True
    except Exception as e:
        print(f"Authentication failed: {e}")
        return False

In [28]:
# Method 1: Using pandas with gcsfs
def read_parquet_files_with_pandas(bucket_name, prefix):
    """
    Read multiple Parquet files from GCP using pandas and gcsfs
    """
    fs = gcsfs.GCSFileSystem()
        
    # List all parquet files in the path
    path_pattern = f"gs://{bucket_name}/{prefix}/*.parquet"
    file_list = fs.glob(path_pattern)
        
    if not file_list:
        print(f"No files found matching pattern: {path_pattern}")
        return pd.DataFrame()
        
    print(f"Found {len(file_list)} files matching pattern")
        
    # Read all the parquet files
    df = pd.concat([pd.read_parquet(file, filesystem=fs) for file in file_list])
    return df

In [30]:
credentials_path = "google_sa.json"  # Optional
setup_gcp_auth(credentials_path)

# Define bucket and path
bucket_name = "gcs-email-landing-inbox-ai"
prefix = "user_id/cron/emailid/emails"

# Read Parquet files using any of the methods
df = read_parquet_files_with_pandas(bucket_name, prefix)
# or
# df = read_parquet_files_by_downloading(bucket_name, prefix)
# or
# df = read_parquet_files_with_pyarrow(bucket_name, prefix)

# Display the first few rows
print(df.head())

Using credentials from: google_sa.json
Authentication successful. Found 1 buckets.
Found 4 files matching pattern
         message_id                                         from_email  \
0  1954dbd35c0e6cd7           BMS Talent Acquisition <noreply@bms.com>   
1  1954db9ec067665f  "Dunkin' Rewards" <dunkinrewards@emailinfo.dun...   
2  1954da04c3828c92              redBus <greetings@travel.e-redbus.in>   
3  1954d99646394c24             Peacock <no-reply@email.peacocktv.com>   
4  1954d78743152da6                          Chase <Chase@e.chase.com>   

                       to    cc   bcc  \
0    [pc612001@gmail.com]  None  None   
1  [<pc612001@gmail.com>]  None  None   
2    [pc612001@gmail.com]  None  None   
3    [pc612001@gmail.com]  None  None   
4    [pc612001@gmail.com]  None  None   

                                             subject  \
0   Spotlighting our People, Culture, and Innovation   
1                            Taste the hype for $3 💋   
2                  Pradnye

In [None]:
def upload_parquet_to_gcs(local_parquet_path, destination_blob_name):
    """
    Upload a Parquet file to GCS bucket
    
    Args:
        local_parquet_path: Path to local Parquet file
        destination_blob_name: Destination path in GCS bucket
    """
    try:
        storage_client = storage.Client()
        bucket = storage_client.bucket("gcs-email-landing-inbox-ai")
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(local_parquet_path)
        print(f"File {local_parquet_path} uploaded to gs://{bucket_name}/{destination_blob_name}")
    except Exception as e:
        print(f"Error uploading file: {e}")

# Example usage:
# Assuming you have a local Parquet file
# upload_parquet_to_gcs("emails_batch_00004.parquet", "user_id/cron/emailid/emails/emails_batch_00004.parquet")

In [None]:
import chromadb
# Example setup of the client to connect to your chroma server
client = chromadb.HttpClient(host='localhost', port=8000)

<class 'pandas.core.frame.DataFrame'>
Index: 162 entries, 0 to 11
Data columns (total 13 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   message_id   162 non-null    object
 1   from_email   162 non-null    object
 2   to           162 non-null    object
 3   cc           3 non-null      object
 4   bcc          0 non-null      object
 5   subject      162 non-null    object
 6   date         162 non-null    object
 7   content      162 non-null    object
 8   plain_text   86 non-null     object
 9   html         161 non-null    object
 10  attachments  162 non-null    object
 11  thread_id    162 non-null    object
 12  labels       162 non-null    object
dtypes: object(13)
memory usage: 17.7+ KB


In [None]:
df['labels'] = df['labels'].astype(str)
# using apply function to create a new column
df['metadata'] = df.apply(lambda row: {"from":row.from_email,  "date":row.date, "labels":row.labels}, axis = 1)

# Print the DataFrame after addition
# of new column
print(df.metadata.to_list())

[{'from': 'BMS Talent Acquisition <noreply@bms.com>', 'date': '2025-02-28T18:06:40+00:00', 'labels': "['CATEGORY_PROMOTIONS' 'UNREAD' 'IMPORTANT' 'INBOX']"}, {'from': '"Dunkin\' Rewards" <dunkinrewards@emailinfo.dunkinrewards.com>', 'date': '2025-02-28T17:47:43+00:00', 'labels': "['CATEGORY_PROMOTIONS' 'UNREAD' 'INBOX']"}, {'from': 'redBus <greetings@travel.e-redbus.in>', 'date': '2025-02-28T17:35:05+00:00', 'labels': "['CATEGORY_PROMOTIONS' 'UNREAD' 'INBOX']"}, {'from': 'Peacock <no-reply@email.peacocktv.com>', 'date': '2025-02-28T17:27:29+00:00', 'labels': "['CATEGORY_PROMOTIONS' 'UNREAD' 'INBOX']"}, {'from': 'Chase <Chase@e.chase.com>', 'date': '2025-02-28T16:51:31+00:00', 'labels': "['IMPORTANT' 'CATEGORY_UPDATES' 'INBOX']"}, {'from': 'Meetup <info@email.meetup.com>', 'date': '2025-02-28T15:46:15+00:00', 'labels': "['IMPORTANT' 'CATEGORY_UPDATES' 'INBOX']"}, {'from': '"Yang (Meetup)" <Boston-IT-and-Tech-Meetup-announce@email.meetup.com>', 'date': '2025-02-28T15:19:30+00:00', 'label

In [None]:
def upload_to_chroma(user_id, df, client):
    """
    Upload data to Chroma
    """
    try:
        collection = client.get_or_create_collection(name=user_id)
        
        # Upload data to Chroma
        collection.upsert(
            documents=df.subject.to_list(),
            metadatas=df.metadata.to_list(),
            ids=df.message_id.to_list()
        )
    except Exception as e:
        print(f"Error uploading data to Chroma: {e}")

In [36]:
upload_to_chroma("test_id", df, client)