In [None]:
import os
import tqdm
import warnings
import argparse
import dask.dataframe as dd
import dask_geopandas as dg
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from dask.distributed import Client
import gcsfs
from google.oauth2 import service_account
from google_auth_oauthlib.flow import InstalledAppFlow
# Use the credentials in your application
from google.cloud import storage

import dask.dataframe as dd
from gcp_config import CLIENT_SECRETS_FILE

# Path to your downloaded OAuth client credentials
client_secrets_file = CLIENT_SECRETS_FILE

# Scopes define the level of access requested (adjust as needed)
SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]

# Launch OAuth flow to authenticate with your Google account
flow = InstalledAppFlow.from_client_secrets_file(client_secrets_file, SCOPES)
creds = flow.run_local_server(port=0)


client = storage.Client(credentials=creds)
fs = gcsfs.GCSFileSystem(token=client._credentials)
bucket_name = "ingest-unacast-data"
folder_name = "activity/il"

bucket = client.bucket(bucket_name)



# Ignore all warnings
warnings.filterwarnings("ignore")

# Define target columns as per your requirement
target_columns = ['identifier', 'identifier_type', 'timestamp', 'local_date_time','province',
      'classification', 'duration_seconds',
       'centroid_latitude', 'centroid_longitude',
       'bump_count', 'potential_start_time',
       'potential_end_time']

  # Replace with actual column names
target_provinces = ['HaMerkaz', 'Tel Aviv', 'HaDarom']

# # Define the path to files
# parent_folder = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
# path2files = os.path.join(parent_folder, "habidatum_folder")


dtype_dict = {
    'local_date_time': 'datetime64[us]',
    'potential_start_time': 'datetime64[us]',
    'potential_end_time': 'datetime64[us]',
    'centroid_longitude': 'float64',
    'centroid_latitude': 'float64',
    'bump_count': 'int64',
    'duration_seconds': 'float64',
    'province': 'string',  # assuming 'string' dtype if you are using Arrow string types
    'identifier': 'string',
    'identifier_type': 'string',
    'classification': 'string',
    'timestamp': 'datetime64[us]',
    'the_geom': 'object',  # 'object' for geometries or adjust to a compatible GeometryDtype
}

def filter_dwells_by_province(df):
    # Filter provinces within Dask DataFrame
    df = df[df['province'].isin(target_provinces)]
    return df

def filter_dwells_by_coords(df):
    # Create geometry from latitude and longitude
    df["the_geom"] = df.map_partitions(
        lambda part: gpd.GeoSeries([Point(xy) for xy in zip(part.centroid_longitude, part.centroid_latitude)]),
        meta=gpd.GeoSeries()
    )
    
    # Convert the DataFrame to a Dask GeoDataFrame if necessary
    df = dg.from_dask_dataframe(df)
    
    # Set geometry column
    df = df.set_geometry("the_geom")
    
    # Filter by spatial boundary
    df = df[df.within(boundaries)]
    
    return df

def process_and_save_files(month, folder_name):
    
    blobs = bucket.list_blobs(prefix=f"{folder_name}/{month}/")
    file_names = [blob.name for blob in blobs]

    output_file_path = f"{month}_full.parquet"
    
    # Remove the existing output file if present to avoid appending to an old file
    # if os.path.exists(output_file_path):
    #     os.remove(output_file_path)
    
    sorted_file_list = sorted(file_names)

    print("sorted_file_list",sorted_file_list)
    
    for file in tqdm.tqdm(sorted_file_list, total=len(sorted_file_list)):
       
        
        gcs_path = f"gcs://{bucket_name}/{file}"
        
        print(gcs_path)
        
        # Load the file lazily with Dask, applying filters if supported
        dask_df = dd.read_parquet(gcs_path, filesystem=fs, columns=target_columns, dtype=dtype_dict)

        # Apply filtering conditions
        dask_df = dask_df[dask_df["duration_seconds"] > 180]
        dask_df = filter_dwells_by_province(dask_df)  # Ensure compatibility with Dask

        # add partitioning columns
        dask_df['month'] = dask_df['timestamp'].dt.month
        # dask_geo_df = filter_dwells_by_coords(dask_df)

        # # Convert to a Dask GeoDataFrame if needed
        # dask_geo_df = dg.from_dask_dataframe(dask_df)

        # Write to output Parquet file in append mode
        dask_df.to_parquet(output_file_path, append=True,partition_on=['month'], write_index=False)

    print(f"Saved to {output_file_path}")


Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=1085850849712-f392q8vl3bf3uqbgc425jtrev3noajd1.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A58167%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&state=7n8icUu2RXbmLVvggnjq1QmywMftRb&access_type=offline


In [8]:
month='20230707'
# Start a Dask client (optional, adjust resources as needed)
client_dask = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')


process_and_save_files(month,folder_name )

# Close the Dask client
client_dask.close()

sorted_file_list ['activity/il/20230707/june/data000000000000.parquet', 'activity/il/20230707/june/data000000000001.parquet', 'activity/il/20230707/june/data000000000002.parquet', 'activity/il/20230707/june/data000000000003.parquet', 'activity/il/20230707/june/data000000000004.parquet', 'activity/il/20230707/june/data000000000005.parquet', 'activity/il/20230707/june/data000000000006.parquet', 'activity/il/20230707/june/data000000000007.parquet', 'activity/il/20230707/june/data000000000008.parquet', 'activity/il/20230707/june/data000000000009.parquet', 'activity/il/20230707/june/data000000000010.parquet', 'activity/il/20230707/june/data000000000011.parquet', 'activity/il/20230707/june/data000000000012.parquet', 'activity/il/20230707/june/data000000000013.parquet', 'activity/il/20230707/june/data000000000014.parquet', 'activity/il/20230707/june/data000000000015.parquet', 'activity/il/20230707/june/data000000000016.parquet', 'activity/il/20230707/june/data000000000017.parquet', 'activity/

  0%|          | 0/100 [00:00<?, ?it/s]

gcs://ingest-unacast-data/activity/il/20230707/june/data000000000000.parquet


  0%|          | 0/100 [00:00<?, ?it/s]


KeyboardInterrupt: 

2026-02-24 14:19:22,222 - distributed.nanny - ERROR - Worker process died unexpectedly
2026-02-24 14:19:22,222 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
2026-02-24 14:19:22,225 - distributed.nanny - ERROR - Worker process died unexpectedly
2026-02-24 14:19:22,225 - distributed.nanny - ERROR - Worker process died unexpectedly
2026-02-24 14:19:22,226 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
2026-02-24 14:19:22,230 - distributed.nanny - ERROR - Worker process died unexpectedly
2026-02-24 14:19:22,231 - distributed.nanny - ERROR - Worker process died unexpectedly
2026-02-24 14:19:22,231 - distributed.nanny - ERROR - Worker process died unexpectedly
Traceback (most recent call last):
Traceback (most recent call last):
Process Dask Worker proc

In [5]:
folder_name

'activity/il'