In [None]:
from google.oauth2 import service_account
from googleapiclient.discovery import build
import pytz, os,json, requests
import pandas as pd
from datetime import datetime, timedelta
from io import BytesIO
from googleapiclient.errors import HttpError
import googleapiclient.http

# Path to your service account key file and the scopes needed for the Drive API
SERVICE_ACCOUNT_FILE = "/Users/sachin/TheJuniorDataScientist/credentials/sachin_service account.json"
SCOPES = ['https://www.googleapis.com/auth/drive']

# Authenticate and build the Drive API client
def authenticate_drive_service():
    try:
        credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
        drive_service = build('drive', 'v3', credentials=credentials)
        print("Google Drive service authorized successfully.")
        return drive_service
    except Exception as e:
        print("Error during authentication:", e)
        raise

# Generate date range (YYYYMMDD format)
def get_date_range(local_tz, start_date=None, end_date=None, days=None):
    current_time = datetime.now(pytz.utc).astimezone(local_tz)
    if end_date:
        end_date = datetime.strptime(end_date, "%Y%m%d").astimezone(local_tz)
    else:
        end_date = current_time

    if start_date:
        start_date = datetime.strptime(start_date, "%Y%m%d").astimezone(local_tz)
    elif days:
        start_date = end_date - timedelta(days=days)
    else:
        start_date = end_date - timedelta(days=10)

    return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")

def fetch_files_with_dates(drive_service, folder_id, date_range):
    try:
        query = f"'{folder_id}' in parents and mimeType='text/csv' and trashed=false"
        
        files = []
        page_token = None  # Initialize page token
        
        while True:
            # Fetch a page of files
            response = drive_service.files().list(
                q=query,
                fields="nextPageToken, files(id, name, size)",
                pageToken=page_token
            ).execute()

            # Extend the list with new files from this page
            files.extend(response.get('files', []))

            # Check if there's another page
            page_token = response.get('nextPageToken')
            if not page_token:
                break  # No more pages, exit loop
        
        print(f"Total files found in the folder: {len(files)}")

        # Filter files by date in filenames
        filtered_files = []
        for file in files:
            for date in date_range:
                if date in file['name']:
                    file['size_mb'] = round(int(file.get('size', 0)) / (1024 * 1024), 2)  # Convert size to MB
                    filtered_files.append(file)
                    break

        print(f"Number of files matching the date range: {len(filtered_files)}")
        return filtered_files
    except HttpError as error:
        print(f"An error occurred: {error}")
        raise



def process_files(drive_service, files, required_columns, checkpoint_file="download_progress.json", save_interval = 50 * 1024 * 1024, chunksize=10 * 1024 * 1024):
    combined_data = []
    last_save_time = datetime.now()

    try:
        with open(checkpoint_file, "r") as f:
            download_progress = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        download_progress = {}

    for i, file in enumerate(files):
        try:
            file_id = file['id']
            file_name = file['name']
            file_size = int(file.get('size', 0))
            bytes_downloaded = download_progress.get(file_name, 0)

            if bytes_downloaded >= file_size:
                print(f"{file_name} has already been downloaded. Skipping...")
                continue
            
            print(f"\nFile number {i+1} - Downloading file: {file_name} (Resuming from {bytes_downloaded} bytes)")

            request = drive_service.files().get_media(fileId=file_id)
            file_stream = BytesIO()
            downloader = googleapiclient.http.MediaIoBaseDownload(file_stream, request, chunksize=chunksize)

            done = False
            while not done:
                try:
                    status, done = downloader.next_chunk()
                    bytes_downloaded += len(file_stream.getvalue())
                    download_progress[file_name] = bytes_downloaded

                    if (datetime.now() - last_save_time).total_seconds() > 60 or bytes_downloaded % save_interval < chunksize:
                        with open(checkpoint_file, "w") as f:
                            json.dump(download_progress, f)
                        last_save_time = datetime.now()

                    print(f"Download {int(status.progress() * 100)}% complete for {file_name}.")
                except (requests.ConnectionError, HttpError) as e:
                    print(f"Network error during download: {e}. Retrying...")
                    break

            file_stream.seek(0)
            df = pd.read_csv(file_stream, usecols=required_columns)
            df = df[required_columns]
            df['source_file'] = file_name
            combined_data.append(df)

            if file_name in download_progress:
                del download_progress[file_name]
                with open(checkpoint_file, "w") as f:
                    json.dump(download_progress, f)

        except Exception as e:
            print(f"Error processing file {file_name}: {e}")

    if combined_data:
        combined_data = [df for df in combined_data if not df.empty]
        if combined_data:
            final_df = pd.concat(combined_data, ignore_index=True)
            final_df[['to_qty']] = final_df[['to_qty']].fillna(0)
            print("All files processed and merged successfully.")
            return final_df
    else:
        print("No files were successfully processed.")
        return pd.DataFrame()


# Step 1: Authenticate Drive Service
drive_service = authenticate_drive_service()

local_tz = pytz.timezone("Asia/Kolkata")
start_date_input = None # "20250201"
end_date_input =  None #"20250225"
days_range = 2

start_date, end_date = get_date_range(local_tz, start_date_input, end_date_input, days_range)
date_range = pd.date_range(start=start_date, end=end_date).strftime("%Y%m%d").tolist()
print(f"Date range for filtering: {start_date} to {end_date}")

# Step 3: Fetch Files Matching the Date Range
folder_id = '1akFe2_iKCqoZ0lAETRipXxDSKYnFmx2x'
filtered_files = fetch_files_with_dates(drive_service, folder_id, date_range)
file_df = pd.DataFrame(filtered_files)
file_df.drop(columns=['size','id'],inplace=True, axis=1)
print(file_df, f"\n\nTotal Data to be processed {file_df['size_mb'].sum()} MB")

# Step 4: Process Files and Combine Data
required_columns = ['dd', 'mm', 'yyyy', 'mh_code', 'type', 'store_name', 'to_qty','store_id', 'product_variant_id']
final_df = process_files(drive_service, filtered_files, required_columns)

print('Total rows before removing null values :',final_df.shape[0])
### Printing Null Records
#final_df[final_df[['dd', 'mm', 'yyyy', 'mh_code', 'type', 'store_name', 'store_id']].isna().all(axis=1)]




# Dropping Null records
main_df = final_df.dropna(subset=['dd', 'mm', 'yyyy', 'mh_code', 'type', 'store_name', 'store_id', 'product_variant_id'], how='all').copy()
columns_to_convert = ['dd', 'mm', 'yyyy']
try:
    main_df[columns_to_convert] = main_df[columns_to_convert].fillna(1).astype(int)
    main_df.rename(columns={'yyyy': 'year', 'mm': 'month', 'dd': 'day'}, inplace=True)
except Exception as e:
    print(f"Error during  conversion: {e}")
try:
    # Creating the date column from dd, mm, yyyy
    main_df['date'] = pd.to_datetime(main_df[['year', 'month', 'day']])
    print("Date column created successfully!")
except Exception as e:
    print(f"Error during date conversion: {e}")


main_df = main_df.groupby(['type','mh_code', 'store_name', 'store_id', 'source_file', 'date', 'product_variant_id']).agg({'to_qty': 'sum'}).reset_index()
main_df.rename(columns={'store_name': 'Destination_store_name', 'store_id': 'Destination_store_id'}, inplace=True)


Google Drive service authorized successfully.
Date range for filtering: 20250309 to 20250314
Total files found in the folder: 4142
Number of files matching the date range: 34
                      name  size_mb
0         20250313_CHN.csv    13.15
1         20250313_KOL.csv     4.45
2         20250313_NCR.csv    38.53
3      20250313_Mumbai.csv    21.59
4         20250313_HYD.csv    16.48
5        20250313_Pune.csv     6.16
6   20250313_Bengaluru.csv    27.81
7         20250312_CHN.csv    13.09
8         20250312_NCR.csv    39.70
9         20250312_HYD.csv    15.42
10        20250312_KOL.csv     5.91
11       20250312_Pune.csv     6.24
12  20250312_Bengaluru.csv    28.10
13        20250311_HYD.csv    14.94
14        20250311_CHN.csv    11.48
15        20250311_KOL.csv     4.76
16        20250311_NCR.csv    41.31
17  20250311_Bengaluru.csv    26.16
18     20250311_Mumbai.csv    18.48
19       20250311_Pune.csv     5.33
20        20250310_CHN.csv    11.66
21        20250310_KOL.csv     4.

  df = pd.read_csv(file_stream, usecols=required_columns)


Download 81% complete for 20250309_CHN.csv.
Download 100% complete for 20250309_CHN.csv.

File number 31 - Downloading file: 20250309_KOL.csv (Resuming from 0 bytes)
Download 100% complete for 20250309_KOL.csv.

File number 32 - Downloading file: 20250309_Mumbai.csv (Resuming from 0 bytes)
Download 48% complete for 20250309_Mumbai.csv.
Download 97% complete for 20250309_Mumbai.csv.
Download 100% complete for 20250309_Mumbai.csv.

File number 33 - Downloading file: 20250309_Pune.csv (Resuming from 0 bytes)


  df = pd.read_csv(file_stream, usecols=required_columns)


Download 100% complete for 20250309_Pune.csv.

File number 34 - Downloading file: 20250309_Bengaluru.csv (Resuming from 0 bytes)
Download 42% complete for 20250309_Bengaluru.csv.
Download 85% complete for 20250309_Bengaluru.csv.
Download 100% complete for 20250309_Bengaluru.csv.
All files processed and merged successfully.
Total rows before removing null values : 5299656
Date column created successfully!


In [4]:
main_df

Unnamed: 0,type,mh_code,Destination_store_name,Destination_store_id,source_file,date,product_variant_id,to_qty
0,COLD,KOL040M,KOL- Rickjoni,dd246eef-6cb6-4358-b455-9b46c1c444ca,20250309_KOL.csv,2025-03-09,03ea892c-6de0-423a-a2a2-e531f220fb86,2.0
1,COLD,KOL040M,KOL- Rickjoni,dd246eef-6cb6-4358-b455-9b46c1c444ca,20250309_KOL.csv,2025-03-09,0b30de60-36d7-4789-a325-66c44d8ba9d2,1.0
2,COLD,KOL040M,KOL- Rickjoni,dd246eef-6cb6-4358-b455-9b46c1c444ca,20250309_KOL.csv,2025-03-09,0cb36910-19d9-402f-99d8-37170972819e,4.0
3,COLD,KOL040M,KOL- Rickjoni,dd246eef-6cb6-4358-b455-9b46c1c444ca,20250309_KOL.csv,2025-03-09,0dc04b66-61e5-465a-954c-174270062b30,1.0
4,COLD,KOL040M,KOL- Rickjoni,dd246eef-6cb6-4358-b455-9b46c1c444ca,20250309_KOL.csv,2025-03-09,1363a5ff-b43b-439b-a814-a0982e8559de,1.0
...,...,...,...,...,...,...,...,...
5228623,Dry,PAT002M,SAS-Zirakpur Apple Heights,7b2176ce-862d-4e58-9d5e-6373e729c2a6,20250313_NCR.csv,2025-03-13,fddeda02-caff-4a4c-96e6-580d46e5c31b,2.0
5228624,Dry,PAT002M,SAS-Zirakpur Apple Heights,7b2176ce-862d-4e58-9d5e-6373e729c2a6,20250313_NCR.csv,2025-03-13,fe5fd19f-16e9-4772-848e-0bb472ad6797,1.0
5228625,Dry,PAT002M,SAS-Zirakpur Apple Heights,7b2176ce-862d-4e58-9d5e-6373e729c2a6,20250313_NCR.csv,2025-03-13,fe9e0cd3-014c-4407-af64-7ca3ec180513,2.0
5228626,Dry,PAT002M,SAS-Zirakpur Apple Heights,7b2176ce-862d-4e58-9d5e-6373e729c2a6,20250313_NCR.csv,2025-03-13,ff2f7925-64d6-4fd5-8fe8-c093de4c6488,2.0


In [1]:
import os
import json
import requests
import pandas as pd
import pytz
from io import BytesIO
from datetime import datetime, timedelta
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import googleapiclient.http


In [2]:
# ===========================
# CONFIGURATION & AUTHENTICATION
# ===========================

# Configuration
SERVICE_ACCOUNT_FILE = "/Users/sachin/TheJuniorDataScientist/credentials/sachin_service account.json"
SCOPES = ['https://www.googleapis.com/auth/drive']
CHECKPOINT_FILE = "download_progress.json"
CHUNKSIZE = 10 * 1024 * 1024  # 10 MB
SAVE_INTERVAL = 50 * 1024 * 1024  # 50 MB


# Authenticate and build the Drive API client
def authenticate_drive_service():
    try:
        credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
        drive_service = build('drive', 'v3', credentials=credentials)
        print("✅ Google Drive service authorized successfully.")
        return drive_service
    except Exception as e:
        print("❌ Error during authentication:", e)
        raise


In [3]:
# ===========================
# DATE RANGE HANDLING
# ===========================

# Generate date range (YYYYMMDD format)
def get_date_range(local_tz, start_date=None, end_date=None, days=None):
    current_time = datetime.now(pytz.utc).astimezone(local_tz)
    if end_date:
        end_date = datetime.strptime(end_date, "%Y%m%d").astimezone(local_tz)
    else:
        end_date = current_time

    if start_date:
        start_date = datetime.strptime(start_date, "%Y%m%d").astimezone(local_tz)
    elif days:
        start_date = end_date - timedelta(days=days)
    else:
        start_date = end_date - timedelta(days=10)

    return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")


In [None]:
# ===========================
# FETCH FILES FROM GOOGLE DRIVE
# ===========================

def fetch_files_with_dates(drive_service, folder_id, date_range):
    try:
        query = f"'{folder_id}' in parents and mimeType='text/csv' and trashed=false"
        files = []
        page_token = None

        while True:
            response = drive_service.files().list(
                q = query,
                fields = "nextPageToken, files(id, name, size)",
                pageToken = page_token
            ).execute()
            
            files.extend(response.get('files', []))
            page_token = response.get('nextPageToken')
            if not page_token:
                break  

        print(f"📂 Total files found in the folder: {len(files)}")

        # Filter files by date
        filtered_files = [
            {**file, 'size_mb': round(int(file.get('size', 0)) / (1024 * 1024), 2)} for file in files if any(date in file['name'] for date in date_range)
        ]

        print(f"📅 Number of files matching the date range: {len(filtered_files)}")
        return filtered_files
    except HttpError as error:
        print(f"❌ An error occurred: {error}")
        raise


In [None]:
# ===========================
# FETCH FILES FROM GOOGLE DRIVE (Supports Multiple File Types)
# ===========================

def fetch_files_with_dates(drive_service, folder_id, date_range, file_types=None):
    """
    Fetches files from Google Drive within the specified folder, date range, and file types.

    Args:
        drive_service (Resource): Google Drive service resource.
        folder_id (str)         : The ID of the Google Drive folder to search.
        date_range (list)       : List of date strings (YYYYMMDD) to filter files by their name.
        file_types (list)       : List of file extensions to filter (e.g., ['csv', 'xlsx']).

    Returns:
        list: A list of files that match the criteria.
    """
    # Supported MIME types for different file extensions
    mime_type_mapping = {
        'csv': 'text/csv',
        'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
        'xls': 'application/vnd.ms-excel',
        'json': 'application/json',
        'txt': 'text/plain',
        'pdf': 'application/pdf',
        'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
        'doc': 'application/msword',
        'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
        'ppt': 'application/vnd.ms-powerpoint',
        'html': 'text/html',
        'zip': 'application/zip',
        'rar': 'application/vnd.rar',
        'tar': 'application/x-tar',
        'gz': 'application/gzip',
        'jpg': 'image/jpeg',
        'jpeg': 'image/jpeg',
        'png': 'image/png',
        'gif': 'image/gif',
        'mp4': 'video/mp4',
        'mp3': 'audio/mpeg' }
    
    # Create a query string for file types if specified
    mime_queries = []
    if file_types:
        for file_type in file_types:
            if file_type in mime_type_mapping:
                mime_queries.append(f"mimeType='{mime_type_mapping[file_type]}'")
            else:
                print(f"⚠️ Warning: Unsupported file type '{file_type}'. Skipping it.")
    
    # Combine file type queries using OR if specified
    file_type_query = f" and ({' or '.join(mime_queries)})" if mime_queries else ""
    
    try:
        query = f"'{folder_id}' in parents and trashed=false{file_type_query}"
        files = []
        page_token = None

        while True:
            response = drive_service.files().list(
                q=query,
                fields="nextPageToken, files(id, name, size)",
                pageToken=page_token
            ).execute()
            
            files.extend(response.get('files', []))
            page_token = response.get('nextPageToken')
            if not page_token:
                break  

        print(f"📂 Total files found in the folder: {len(files)}")

        # Filter files by date in the filename
        filtered_files = [
            {**file, 'size_mb': round(int(file.get('size', 0)) / (1024 * 1024), 2)}
            for file in files
            if any(date in file['name'] for date in date_range)
        ]

        print(f"📅 Number of files matching the date range: {len(filtered_files)}")
        return filtered_files
    except HttpError as error:
        print(f"❌ An error occurred: {error}")
        raise


In [5]:
# ===========================
# FILE PROCESSING & DOWNLOADING
# ===========================

def process_files(drive_service, files, required_columns):
    combined_data = []
    summary_report = {"Resumed Files": [], "Fully Downloaded Files": [], "Skipped Files": []}

    try:
        with open(CHECKPOINT_FILE, "r") as f:
            download_progress = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        download_progress = {}

    for file in files:
        file_id = file['id']
        file_name = file['name']
        file_size = int(file.get('size', 0))
        bytes_downloaded = download_progress.get(file_name, 0)

        if bytes_downloaded >= file_size:
            print(f"⏩ {file_name} already downloaded. Skipping...")
            summary_report["Skipped Files"].append(file_name)
            continue

        print(f"\n⬇️ Downloading file: {file_name} (Resuming from {bytes_downloaded} bytes)")

        request = drive_service.files().get_media(fileId=file_id)
        file_stream = BytesIO()
        downloader = googleapiclient.http.MediaIoBaseDownload(file_stream, request, chunksize=CHUNKSIZE)
        
        try:
            while True:
                status, done = downloader.next_chunk()
                bytes_downloaded += len(file_stream.getvalue())
                download_progress[file_name] = bytes_downloaded

                # Save progress
                with open(CHECKPOINT_FILE, "w") as f:
                    json.dump(download_progress, f)

                print(f"✅ Download {int(status.progress() * 100)}% complete for {file_name}.")
                
                if done:
                    file_stream.seek(0)
                    df = pd.read_csv(file_stream, usecols=required_columns)
                    df['source_file'] = file_name
                    combined_data.append(df)
                    summary_report["Fully Downloaded Files"].append(file_name)
                    del download_progress[file_name]
                    with open(CHECKPOINT_FILE, "w") as f:
                        json.dump(download_progress, f)
                    break
        except (requests.ConnectionError, HttpError) as e:
            print(f"❌ Network error during download: {e}. Saving progress...")
            summary_report["Resumed Files"].append(file_name)

    if combined_data:
        final_df = pd.concat(combined_data, ignore_index=True)
        final_df[['to_qty']] = final_df[['to_qty']].fillna(0)
        return final_df, summary_report
    else:
        return pd.DataFrame(), summary_report


In [6]:
# ===========================
# MAIN EXECUTION FLOW
# ===========================

if __name__ == "__main__":
    drive_service = authenticate_drive_service()

    local_tz = pytz.timezone("Asia/Kolkata")
    start_date, end_date = get_date_range(local_tz, days=2)
    date_range = pd.date_range(start=start_date, end=end_date).strftime("%Y%m%d").tolist()

    folder_id = '1akFe2_iKCqoZ0lAETRipXxDSKYnFmx2x'
    filtered_files = fetch_files_with_dates(drive_service, folder_id, date_range)

    required_columns = ['dd', 'mm', 'yyyy', 'mh_code', 'type', 'store_name', 'to_qty', 'store_id', 'product_variant_id']
    final_df, summary_report = process_files(drive_service, filtered_files, required_columns)

    print("\n📌 Summary Report:")
    for category, files in summary_report.items():
        print(f"{category}: {len(files)} files")


✅ Google Drive service authorized successfully.
📂 Total files found in the folder: 4142
📅 Number of files matching the date range: 13

⬇️ Downloading file: 20250313_CHN.csv (Resuming from 0 bytes)
✅ Download 76% complete for 20250313_CHN.csv.
✅ Download 100% complete for 20250313_CHN.csv.

⬇️ Downloading file: 20250313_KOL.csv (Resuming from 0 bytes)
✅ Download 100% complete for 20250313_KOL.csv.

⬇️ Downloading file: 20250313_NCR.csv (Resuming from 0 bytes)
✅ Download 25% complete for 20250313_NCR.csv.
✅ Download 51% complete for 20250313_NCR.csv.
✅ Download 77% complete for 20250313_NCR.csv.
✅ Download 100% complete for 20250313_NCR.csv.

⬇️ Downloading file: 20250313_Mumbai.csv (Resuming from 0 bytes)
✅ Download 46% complete for 20250313_Mumbai.csv.
✅ Download 92% complete for 20250313_Mumbai.csv.
✅ Download 100% complete for 20250313_Mumbai.csv.

⬇️ Downloading file: 20250313_HYD.csv (Resuming from 0 bytes)
✅ Download 60% complete for 20250313_HYD.csv.
✅ Download 100% complete for 