In [1]:
#!/usr/bin/env python
import pandas as pd
from datetime import datetime
from data_mountain_query.counters import AmbientTweetCounters, Counters
from data_mountain_query.sentiment import load_happs_scores, df_sentiment
from data_mountain_query.counters import lang_dict
from data_mountain_query.connection import get_connection
from data_mountain_query.query import get_tweets
from data_mountain_query.parsers import load_ngrams_parser, parse_ngrams_tweet
import random
import geopandas as gpd
from shapely.geometry import Point
import numpy as np
import fiona
from pathlib import Path
import os

# ---------------------------------------------------
# Import your project-specific modules as needed.
# ---------------------------------------------------
# from data_mountain_query.connection import get_connection
# from data_mountain_query.query import get_tweets
# from data_mountain_query.sentiment import load_happs_scores, df_sentiment
# from data_mountain_query.parsers import load_ngrams_parser, parse_ngrams_tweet
# from data_mountain_query.counters import lang_dict

gdb_path = "/gpfs2/scratch/pwormser/research/SmartLocationDatabase_copy.gdb"
cache_file = "block_groups.pkl"

if os.path.exists(cache_file):
    block_groups = gpd.GeoDataFrame(pd.read_pickle(cache_file))
    print("Loaded block_groups from cache.")
else:
    # List available layers if needed:
    import fiona
    layers = fiona.listlayers(gdb_path)
    print("Available layers:", layers)
    
    block_groups = gpd.read_file(gdb_path, layer="EPA_SLD_Database_V3")
    # Optionally reproject if needed:
    block_groups = block_groups.to_crs("EPSG:4326")
    # Cache the result for future runs:
    block_groups.to_pickle(cache_file)
    print("Read block_groups from GDB and cached it.")

# ----------------------------
# Utility Functions
# ----------------------------

def extract_lat_lon(df):
    # Instead of making a full copy, we use .loc assignment directly.
    df['latitude'] = df['geo'].apply(lambda g: g.get('coordinates', [None, None])[1])
    df['longitude'] = df['geo'].apply(lambda g: g.get('coordinates', [None, None])[0])
    return df  # Now df is modified in-place.

def filter_us_tweets(df):
    # In-place boolean indexing returns a copy already.
    continental = ((df['latitude'] >= 24.396308) & (df['latitude'] <= 49.384358) &
                   (df['longitude'] >= -125.0) & (df['longitude'] <= -66.93457))
    alaska = ((df['latitude'] >= 51.2) & (df['latitude'] <= 71.4) &
              (df['longitude'] >= -179.1) & (df['longitude'] <= -129.9))
    hawaii = ((df['latitude'] >= 18.9) & (df['latitude'] <= 22.2) &
              (df['longitude'] >= -160.3) & (df['longitude'] <= -154.8))
    us_mask = continental | alaska | hawaii
    return df[us_mask]  # This creates a new DataFrame (which is unavoidable)

def refine_with_us_boundary(df):
    # Build GeoDataFrame without extra copying if possible.
    geometry = [Point(lon, lat) for lon, lat in zip(df['longitude'], df['latitude'])]
    gdf = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326")
    # Reproject in place (returns a new GeoDataFrame, but we assign it immediately)
    gdf = gdf.to_crs(block_groups.crs)
    us_boundary_reproj = us_boundary.to_crs(gdf.crs)
    joined = gpd.sjoin(gdf, us_boundary_reproj, how="inner", predicate="within")
    joined = joined.reset_index(drop=True)
    # Only drop geometry and the added index_right
    return joined.drop(columns=["geometry", "index_right"], errors='ignore')

def tag_tweets_with_geodata(df):
    target_crs = block_groups.crs
    zip_shp = '/gpfs2/scratch/pwormser/research/USA_Boundaries_2022_-232574676275878974/USA_ZipCode.shp'
    zip_codes_local = gpd.read_file(zip_shp).to_crs(target_crs)
    if 'index_right' in zip_codes_local.columns:
        zip_codes_local.drop(columns=['index_right'], inplace=True)
    
    bg = block_groups  # using global block_groups
    if 'index_right' in bg.columns:
        bg = bg.drop(columns=['index_right'])
    
    # Create GeoDataFrame from df without extra copy if possible.
    points_gdf = gpd.GeoDataFrame(df, geometry=[Point(xy) for xy in zip(df['longitude'], df['latitude'])], crs="EPSG:4326")
    points_gdf = points_gdf.to_crs(target_crs)
    if 'index_right' in points_gdf.columns:
        points_gdf.drop(columns=['index_right'], inplace=True)
    
    # Spatial join with block groups.
    points_bg = gpd.sjoin(points_gdf, bg[['GEOID10', 'geometry']], how="left", predicate="within")
    if 'index_right' in points_bg.columns:
        points_bg.drop(columns=['index_right'], inplace=True)
    
    # Spatial join with ZIP codes.
    points_zip = gpd.sjoin(points_gdf, zip_codes_local[['ZIP_CODE', 'geometry']], how="left", predicate="within")
    if 'index_right' in points_zip.columns:
        points_zip.drop(columns=['index_right'], inplace=True)
    
    # Assign new columns into df without copying the whole DataFrame.
    df['GEOID10'] = points_bg['GEOID10'].values
    df['ZIP_Code'] = points_zip['ZIP_CODE'].values
    return df

def merge_counters(counter_list):
    aggregated = {}
    for counter in counter_list:
        for word, counts in counter.items():
            if word not in aggregated:
                aggregated[word] = counts.copy()
            else:
                for key, value in counts.items():
                    aggregated[word][key] = aggregated[word].get(key, 0) + value
    return aggregated

def aggregate_counters_by_block(df):
    df['counters'] = df['counters'].apply(lambda x: x if isinstance(x, dict) else {})
    grouped = df.groupby('GEOID10', as_index=False)['counters'].agg(
        lambda counters: merge_counters(list(counters))
    )
    grouped.columns = ['GEOID10', 'aggregated_counters']
    return grouped

def process_chunk(chunk_df):
    chunk_df = extract_lat_lon(chunk_df)
    chunk_df = filter_us_tweets(chunk_df)
    chunk_df = refine_with_us_boundary(chunk_df)
    chunk_df = tag_tweets_with_geodata(chunk_df)
    agg_df = aggregate_counters_by_block(chunk_df)
    # Force agg_df to be a DataFrame by resetting the index if necessary:
    if isinstance(agg_df, pd.Series):
        agg_df = agg_df.reset_index()
    return {row['GEOID10']: row['aggregated_counters'] for _, row in agg_df.iterrows()}

# ----------------------------
# Main Processing Pipeline
# ----------------------------
if __name__ == '__main__':
    start_date = datetime(2016, 5, 1)
    end_date = datetime(2016, 5, 2)
    lang = 'en'
    
    collection, client = get_connection(geotweets=True)
    query = {'tweet_created_at': {'$gte': start_date, '$lt': end_date}, "fastText_lang": "en"}
    
    tweets = get_tweets(collection, query, limit=25)
    
    # Add the "counters" key using the n-gram parser.
    ngrams_parser = load_ngrams_parser()
    tweets = [{**tweet, "counters": parse_ngrams_tweet(tweet, ngrams_parser)} for tweet in tweets]
    df_all = pd.DataFrame(list(tweets))
    
    global us_boundary
    us_boundary = gpd.read_file('/gpfs2/scratch/pwormser/research/cb_2018_us_nation_20m/cb_2018_us_nation_20m.shp')
    us_boundary = us_boundary.to_crs("EPSG:4326")
    
    word2score = load_happs_scores(lang=lang_dict[lang])
    df_all = df_sentiment(df_all, word2score=word2score)
    
    # Process tweets in chunks
    chunk_size = 1000  # For testing; use 1000+ for production.
    all_agg = {}
    for i in range(0, len(df_all), chunk_size):
        chunk_df = df_all.iloc[i : i + chunk_size]
        chunk_result = process_chunk(chunk_df)
        for geoid, counters in chunk_result.items():
            if geoid in all_agg:
                all_agg[geoid] = merge_counters([all_agg[geoid], counters])
            else:
                all_agg[geoid] = counters
    
    final_agg = pd.DataFrame({
        'GEOID10': list(all_agg.keys()),
        'aggregated_counters': list(all_agg.values())
    })
    
    pd.set_option('display.max_colwidth', None)
    print(final_agg)



Loaded block_groups from cache.
Connecting on dm-mongovm-001


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['latitude'] = df['geo'].apply(lambda g: g.get('coordinates', [None, None])[1])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['longitude'] = df['geo'].apply(lambda g: g.get('coordinates', [None, None])[0])


        GEOID10  \
0  130639800001   
1  420370513001   
2  481130174001   
3  510594809021   
4  510594822034   

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            aggregated_counters  
0  