In [12]:
from datetime import datetime
import pandas as pd
import json
import time
import logging
from sqlalchemy import create_engine
from dags.streamer import JsonStreamerPandas

In [13]:
engine = create_engine('postgresql://admin:admin@localhost:5432/movielens')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x1689a68d0>

In [14]:
db_actors = pd.read_json('db_data/db_actors.jsonl', lines=True)
db_directors = pd.read_json('db_data/db_directors.jsonl', lines=True)
db_genres = pd.read_json('db_data/db_genres.jsonl', lines=True)
db_movies_actors = pd.read_json('db_data/db_movies_actors.jsonl', lines=True)
db_movies_directors = pd.read_json('db_data/db_movies_directors.jsonl', lines=True)
db_movies_genres = pd.read_json('db_data/db_movies_genres.jsonl', lines=True)
db_movies = pd.read_json('db_data/db_movies.jsonl', lines=True)

In [15]:
db_movies.to_sql('movies', engine, if_exists='replace', index=False)
db_actors.to_sql('actors', engine, if_exists='replace', index=False)
db_directors.to_sql('directors', engine, if_exists='replace', index=False)
db_genres.to_sql('genres', engine, if_exists='replace', index=False)
db_movies_actors.to_sql('movies_actors', engine, if_exists='replace', index=False)
db_movies_directors.to_sql('movies_directors', engine, if_exists='replace', index=False)
db_movies_genres.to_sql('movies_genres', engine, if_exists='replace', index=False)

500

In [16]:
rating_path = 'dags/streaming_data/streaming_ratings.jsonl'
review_path = 'dags/streaming_data/streaming_reviews.jsonl'
movie_path = 'dags/streaming_data/streaming_movies.jsonl'


db_stream_ratings = pd.read_json(rating_path, lines=True, nrows=300000)
db_stream_reviews = pd.read_json(review_path, lines=True, nrows=300000)
db_stream_movies = pd.read_json(movie_path, lines=True)

In [17]:
import re

def extract_year(title):
    """
    Extracts the year from a movie title.

    Parameters:
    - title (str): The title of the movie, which contains the year in parentheses.

    Returns:
    - int: The extracted year as an integer, or None if no year is found.
    """
    match = re.search(r'\((\d{4})\)', title)
    if match:
        return int(match.group(1))
    return None

In [18]:
def push_movies_data(data, engine):
    rate_per_second = 3  # Messages per second
    interval = 1 / rate_per_second  # Interval in seconds between messages
    movies = data[['item_id', 'title', 'imdbId', 'positive_reviews', 'negative_reviews', 'neutral_reviews']]
    movies['year'] = movies['title'].apply(extract_year)

    curr_time = time.time()
    i = 0
    while True:
        if time.time() - curr_time > 1000:  # Stream for 5 seconds
            break
        try:
             # Select the next batch of data
            new_data = movies.iloc[5*i:5*(i+1)].copy()  # Create a copy to avoid modifying the original dataframe
            
            # Add a timestamp column
            # new_data['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            new_data.to_sql('movies', engine, if_exists='append', index=False)

            i += 1
            time.sleep(interval)  # Control the message rate
        except Exception as e:
            logging.error(f"An error occurred: {e}")
            continue

In [19]:
push_movies_data(db_stream_movies, engine)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  movies['year'] = movies['title'].apply(extract_year)


KeyboardInterrupt: 

In [20]:
def push_rating_data(data, engine):
    rate_per_second = 3  # Messages per second
    interval = 1 / rate_per_second  # Interval in seconds between messages

    curr_time = time.time()
    i = 0
    while True:
        if time.time() - curr_time > 1000:  # Stream for 5 seconds
            break
        try:
             # Select the next batch of data
            new_data = data.iloc[5*i:5*(i+1)].copy()  # Create a copy to avoid modifying the original dataframe
            
            # Add a timestamp column
            new_data['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            new_data.to_sql('ratings', engine, if_exists='append', index=False)

            i += 1
            time.sleep(interval)  # Control the message rate
        except Exception as e:
            logging.error(f"An error occurred: {e}")
            continue

In [21]:
push_rating_data(db_stream_ratings, engine)

In [None]:
def push_review_data(data, engine):
    rate_per_second = 3  # Messages per second
    interval = 1 / rate_per_second  # Interval in seconds between messages

    curr_time = time.time()
    i = 0
    while True:
        if time.time() - curr_time > 1000:  # Stream for 5 seconds
            break
        try:
             # Select the next batch of data
            new_data = data.iloc[5*i:5*(i+1)].copy()  # Create a copy to avoid modifying the original dataframe
            
            # Add a timestamp column
            new_data['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            new_data.to_sql('reviews', engine, if_exists='append', index=False)

            i += 1
            time.sleep(interval)  # Control the message rate
        except Exception as e:
            logging.error(f"An error occurred: {e}")
            continue