In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF
from pyspark.ml import Pipeline

# Initialize Spark Session with proper configurations
spark = SparkSession.builder \
    .appName("ChicagoNightlife") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .master("local[*]") \
    .getOrCreate()

# Load and process data
posts_df = spark.read.csv('../../notebooks/venue_posts.csv', header=True, inferSchema=True)
comments_df = spark.read.csv('../../notebooks/venue_comments.csv', header=True, inferSchema=True)
users_df = spark.read.csv('../../notebooks/venue_users.csv', header=True, inferSchema=True)

# Join DataFrames
combined_df = posts_df.join(comments_df, "post_id", "left") \
    .join(users_df, posts_df.author == users_df.username, "left")

In [6]:
combined_df.head()

Row(post_id='1e0xyrh', title='XSport Fitness is sold to LA Fitness', author='Minimum_Device_6379', score='609', created_utc='2024-07-11 14:52:55', venue='CLUB ABERDEEN INC', subreddit='chicago', comment_id='lcqyaan', author='todosdelosbutts', body='This will be the death knell.', score=None, created_utc=None, username='Minimum_Device_6379', created_utc=datetime.datetime(2021, 2, 9, 23, 48, 34), comment_karma=3456, link_karma=2828, comment_id=None)

In [22]:
def clean_data(df):
    # Remove null values for available columns
    cleaned_df = df.dropna(subset=['title', 'post_id', 'author', 'score', 'created_utc', 'venue', 'subreddit'])
    
    # Convert timestamps
    cleaned_df = cleaned_df.withColumn('post_date', 
        to_timestamp(col('created_utc')))
    
    # Remove special characters and convert to lowercase for title
    cleaned_df = cleaned_df \
        .withColumn('clean_title', 
            regexp_replace(lower(col('title')), '[^a-zA-Z0-9\\s]', ''))
    
    # Remove duplicate posts
    cleaned_df = cleaned_df.dropDuplicates(['post_id'])
    
    return cleaned_df

def process_text(df):
    # Initialize text processing stages
    title_tokenizer = Tokenizer(inputCol="clean_title", outputCol="title_tokens")
    
    # Remove stop words
    stop_words = StopWordsRemover.loadDefaultStopWords("english")
    title_remover = StopWordsRemover(inputCol="title_tokens", 
                                    outputCol="filtered_title_tokens",
                                    stopWords=stop_words)
    
    # TF-IDF for title
    title_hashingTF = HashingTF(inputCol="filtered_title_tokens", 
                               outputCol="title_tf", numFeatures=1000)
    
    title_idf = IDF(inputCol="title_tf", outputCol="title_features")
    
    # Create and apply pipeline
    pipeline = Pipeline(stages=[
        title_tokenizer,
        title_remover,
        title_hashingTF,
        title_idf
    ])
    
    return pipeline.fit(df).transform(df)

def engineer_features(df):
    # Add time-based features
    df = df.withColumn('hour_of_day', hour('post_date')) \
           .withColumn('day_of_week', dayofweek('post_date')) \
           .withColumn('month', month('post_date'))
    
    # Calculate engagement metrics (assuming comment_karma is available)
    if 'comment_karma' in df.columns:
        df = df.withColumn('engagement_score', 
            col('score') + col('comment_karma'))
    else:
        df = df.withColumn('engagement_score', col('score'))
    
    # Calculate text lengths
    df = df.withColumn('title_length', 
            size(col('filtered_title_tokens')))
    
    # Create venue popularity metrics
    venue_metrics = df.groupBy('venue') \
        .agg(avg('engagement_score').alias('avg_venue_engagement'),
             count('*').alias('post_count'))
    
    # Join venue metrics back to main dataframe
    df = df.join(venue_metrics, 'venue', 'left')
    
    return df


In [24]:
def main_pipeline():
    # Load and combine data
    print("Loading data...")
    combined_df = spark.read.csv('../../notebooks/venue_posts.csv', header=True, inferSchema=True)
    
    # Clean data
    print("Cleaning data...")
    cleaned_df = clean_data(combined_df)
    
    # Process text
    print("Processing text...")
    processed_df = process_text(cleaned_df)
    
    # Engineer features
    print("Engineering features...")
    final_df = engineer_features(processed_df)
    
    # Save processed data
    print("Saving processed data...")
    final_df.write.mode('overwrite').parquet('processed_data.parquet')
    
    return final_df


In [26]:
if __name__ == "__main__":
    final_df = main_pipeline()
    print("Pipeline completed successfully!")

Loading data...
Cleaning data...
Processing text...


                                                                                

Engineering features...
Saving processed data...
Pipeline completed successfully!


In [32]:
%pip install textblob

Collecting textblob
  Downloading textblob-0.18.0.post0-py3-none-any.whl.metadata (4.5 kB)
Downloading textblob-0.18.0.post0-py3-none-any.whl (626 kB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m626.3/626.3 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m[31m4.4 MB/s[0m eta [36m0:00:01[0m
[?25hInstalling collected packages: textblob
Successfully installed textblob-0.18.0.post0
Note: you may need to restart the kernel to use updated packages.


In [62]:
def format_date(df):
    # Filter out invalid datetime values and convert
    df = df[df['created_utc'].str.match(r'\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}', na=False)]
    df['created_utc'] = pd.to_datetime(df['created_utc'], format='%Y-%m-%d %H:%M:%S', errors='coerce')
    df = df.dropna(subset=['created_utc'])
    df['created_utc'] = df['created_utc'].dt.strftime('%Y-%m-%dT%H:%M:%S')

    return df

SyntaxError: incomplete input (719056565.py, line 1)

In [60]:
from neo4j import GraphDatabase
from textblob import TextBlob
import pandas as pd

class ChicagoNightlifeAnalytics:
    def __init__(self, uri="bolt://localhost:7687", user="neo4j", password="yourpassword"):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
    
    def load_parquet_to_neo4j(self):
        df = pd.read_parquet('processed_data.parquet')

        df = format_date(df)
        
        with self.driver.session() as session:
            # Create Post nodes first
            session.run("""
                UNWIND $rows AS row
                CREATE (p:Post {
                    post_id: row.post_id,
                    title: row.title,
                    score: row.score,
                    created_utc: datetime(row.created_utc),
                    engagement_score: row.engagement_score
                })
            """, rows=df.to_dict('records'))
            
            # Create Venue nodes and relationships
            session.run("""
                UNWIND $rows AS row
                MERGE (v:Venue {name: row.venue})
                WITH v, row
                MATCH (p:Post {post_id: row.post_id})
                CREATE (p)-[:POSTED_AT]->(v)
            """, rows=df.to_dict('records'))
            
            # Create User nodes and relationships
            session.run("""
                UNWIND $rows AS row
                MERGE (u:User {username: row.author})
                WITH u, row
                MATCH (p:Post {post_id: row.post_id})
                CREATE (u)-[:AUTHORED]->(p)
            """, rows=df.to_dict('records'))

    def close(self):
        self.driver.close()


def generate_analytics_output(analytics):
    results = {}
    
    with analytics.driver.session() as session:
        # Sentiment Analysis
        sentiment_query = """
            MATCH (v:Venue)<-[:POSTED_AT]-(p:Post)
            WITH v.name as venue, 
                 toFloat(p.score) as numeric_score,
                 count(p) as post_count
            WHERE post_count > 5
            RETURN venue, 
                   avg(numeric_score) as sentiment_score, 
                   post_count
            ORDER BY sentiment_score DESC
            LIMIT 10
        """
        results['sentiment'] = pd.DataFrame([dict(record) 
            for record in session.run(sentiment_query)])
        
        # Trend Detection
        trend_query = """
            MATCH (v:Venue)<-[:POSTED_AT]-(p:Post)
            WHERE p.created_utc > datetime() - duration('P7D')
            WITH v.name as venue,
                 count(p) as recent_posts,
                 toFloat(p.engagement_score) as numeric_engagement
            RETURN venue, 
                   recent_posts, 
                   avg(numeric_engagement) as avg_engagement
            ORDER BY recent_posts DESC
            LIMIT 5
        """
        results['trends'] = pd.DataFrame([dict(record) 
            for record in session.run(trend_query)])
        
        # Geographic Analysis
        geo_query = """
            MATCH (v:Venue)<-[:POSTED_AT]-(p:Post)
            WITH v.name as venue,
                 count(p) as activity_count,
                 toFloat(p.score) as numeric_score
            RETURN venue, 
                   activity_count,
                   avg(numeric_score) as avg_score
            ORDER BY activity_count DESC
        """
        results['geographic'] = pd.DataFrame([dict(record) 
            for record in session.run(geo_query)])
        
        # Pattern Recognition
        pattern_query = """
            MATCH (u:User)-[:AUTHORED]->(p:Post)-[:POSTED_AT]->(v:Venue)
            WITH u.username as user, collect(distinct v.name) as venues
            WHERE size(venues) > 2
            RETURN user, venues, size(venues) as venue_count
            ORDER BY venue_count DESC
            LIMIT 10
        """
        results['patterns'] = pd.DataFrame([dict(record) 
            for record in session.run(pattern_query)])
    
    return results


{'sentiment':                 venue  sentiment_score  post_count
0               After           3931.0          45
1    Gio's Sports Bar           1942.0          45
2  RED ROOSTER TAVERN           1899.5          45
3             BELUGAS           1389.5          45
4             JUST US           1143.0          45
5          SHADOW BAR           1135.5          45
6             ESTEREO            953.5          45
7               Metro            860.5          45
8  SWEET BOB'S LOUNGE            851.0          45
9        4TH DISTRICT            832.0          45, 'trends': Empty DataFrame
Columns: []
Index: [], 'geographic':                                     venue  activity_count  avg_score
0                           RIVER SHANNON             135        0.0
1                TIMELINE THEATRE COMPANY             135        2.0
2                        SLEEPING VILLAGE              90        3.0
3                         TIME OUT LOUNGE              90        2.0
4               

In [None]:
if __name__ == "__main__":
    analytics = ChicagoNightlifeAnalytics()
    
    analytics.load_parquet_to_neo4j()
    
    results = generate_analytics_output(analytics)

    print(results)
    
    analytics.close()

In [66]:
results

{'sentiment':                 venue  sentiment_score  post_count
 0               After           3931.0          45
 1    Gio's Sports Bar           1942.0          45
 2  RED ROOSTER TAVERN           1899.5          45
 3             BELUGAS           1389.5          45
 4             JUST US           1143.0          45
 5          SHADOW BAR           1135.5          45
 6             ESTEREO            953.5          45
 7               Metro            860.5          45
 8  SWEET BOB'S LOUNGE            851.0          45
 9        4TH DISTRICT            832.0          45,
 'trends': Empty DataFrame
 Columns: []
 Index: [],
 'geographic':                                     venue  activity_count  avg_score
 0                           RIVER SHANNON             135        0.0
 1                TIMELINE THEATRE COMPANY             135        2.0
 2                        SLEEPING VILLAGE              90        3.0
 3                         TIME OUT LOUNGE              90        2.

In [70]:
results["sentiment"]

Unnamed: 0,venue,sentiment_score,post_count
0,After,3931.0,45
1,Gio's Sports Bar,1942.0,45
2,RED ROOSTER TAVERN,1899.5,45
3,BELUGAS,1389.5,45
4,JUST US,1143.0,45
5,SHADOW BAR,1135.5,45
6,ESTEREO,953.5,45
7,Metro,860.5,45
8,SWEET BOB'S LOUNGE,851.0,45
9,4TH DISTRICT,832.0,45


In [72]:
results["geographic"]

Unnamed: 0,venue,activity_count,avg_score
0,RIVER SHANNON,135,0.0
1,TIMELINE THEATRE COMPANY,135,2.0
2,SLEEPING VILLAGE,90,3.0
3,TIME OUT LOUNGE,90,2.0
4,THE MID DAY CLUB,90,0.0
...,...,...,...
131,EUGENE S GEMBARA,45,31.0
132,Five Iron Golf Chicago,45,30.0
133,D'AGOSTINOS PIZZA,45,17.0
134,ESTATE,45,0.0
