# Transform Batch/Streaming Data

In [1]:
import pandas as pd
import datetime
import time

Streaming starting date

In [2]:
pd.DataFrame({'date': ['Thursday, July 27', 'Friday, July 28', 'Saturday, July 29'], 
              'reviews': [10223, 10000, 10000]})

Unnamed: 0,date,reviews
0,"Thursday, July 27",10223
1,"Friday, July 28",10000
2,"Saturday, July 29",10000


In [3]:
df_stream_raw_samples = pd.read_parquet('../../stream_90223_raw_samples.parquet')
df_stream_raw_samples.columns

Index(['asin', 'overall', 'reviewText', 'reviewerID', 'reviewerName',
       'summary', 'verified', 'internal_partition', 'partition_number',
       'style', 'vote', 'image'],
      dtype='object')

In [4]:
batch_data_music_all = pd.read_parquet('../../batch_data_music_all_291631_noID.parquet')
batch_data_music_all.columns

Index(['asin', 'image', 'overall', 'reviewText', 'reviewerID', 'reviewerName',
       'style', 'summary', 'unixReviewTime', 'verified', 'vote'],
      dtype='object')

In [5]:
batch_data_music_all['dateReview'] = pd.to_datetime(
            batch_data_music_all['unixReviewTime'].astype(int), unit='s').dt.date

In [6]:
def get_review_id(id_pre, num_samples, ts):
    # specify padding for int string
    max_unit = 1e9
    n0s = len(str(int(max_unit)))

    if not ts.empty:
        timestamps = ts
    else:
        timestamp = f'{int(time.time()):0{n0s}}'
        timestamps = pd.Series([timestamp for _ in range(num_samples)])

    sub_ids = pd.Series(range(num_samples)).map(f'{{:0{n0s}}}'.format)
    reviews_id = id_pre + timestamps + 'T' + sub_ids

    return reviews_id

def transform(df):
    if 'unixReviewTime' in df.columns: 
        id_prefix = 'B' # Batch Data
        df['dateReview'] = pd.to_datetime(
            df['unixReviewTime'].astype(int), unit='s').dt.date# + diff_days
        
        last_batch_date = df['dateReview'].max()
        first_streaming_date = datetime.datetime.strptime('2023-07-27', "%Y-%m-%d").date()
        # old batch data, slice to present for app demo
        if last_batch_date < first_streaming_date:
            diff_days = first_streaming_date - last_batch_date - datetime.timedelta(days=1)
            df['dateReview'] = df['dateReview'] + diff_days
        timestamp = df.pop('unixReviewTime')
    else:
        id_prefix = 'S' # Stream Data
        timestamp = pd.Series()
        # Remove unnecessary columns
        df.drop(['internal_partition', 'partition_number'], axis=1, inplace=True)
        df['dateReview'] = datetime.date.today()
        
    df['overall'] = pd.to_numeric(df['overall']).astype(int)
    # Create column of review ID 
    df['reviewID'] = get_review_id(id_prefix, len(df), timestamp)
    # Replace nulls by 0s
    df.loc[df['vote'].isnull(), 'vote'] = '0'
    df['vote'] = df['vote'].str.replace(',', '').astype(int)
    # Remove unnecessary columns
    df.drop(['image', 'style'], axis=1, inplace=True)

In [7]:
transform(df_stream_raw_samples)
print(sorted(df_stream_raw_samples.columns))

['asin', 'dateReview', 'overall', 'reviewID', 'reviewText', 'reviewerID', 'reviewerName', 'summary', 'verified', 'vote']


In [8]:
transform(batch_data_music_all)
print(sorted(batch_data_music_all.columns))

TypeError: can't compare datetime.datetime to datetime.date

In [None]:
batch_data_music_all['dateReview'].sort_values(ascending=False)

Prepare streaming data for batch ingestion

In [None]:
import numpy as np

first_sampling = 10223
first_streaming_date = pd.to_datetime('2023-07-27').date()
normal_sampling = 10000
num_samples = len(df_stream_raw_samples)

samples = np.arange(first_sampling, num_samples+1, normal_sampling)
dates = pd.date_range('2023-07-28', periods=(num_samples//normal_sampling)-1)

df_stream_raw_samples.loc[:first_sampling, 'dateReview'] = first_streaming_date

for i in range(len(dates)):
    df_stream_raw_samples.loc[samples[i]:samples[i+1], 'dateReview'] = dates[i].date()

In [None]:
df_stream_raw_samples.to_parquet('stream_data_92223.parquet')

Filter by `Musical Instruments` category

In [None]:
df_musical_instruments_asins = pd.read_parquet('../../musical_instruments_asins.parquet')

In [None]:
df_stream_data_music = df_stream_raw_samples[df_stream_raw_samples['asin'].isin(
    df_musical_instruments_asins['asin'])]
df_stream_data_music.to_parquet(f'stream_data_music_{len(df_stream_data_music)}.parquet')

Concat Batch an Streaming Data for batch ingestion to db

In [None]:
full_dataset = pd.concat([batch_data_music_all, df_stream_data_music]).reset_index(drop=True)
full_dataset.to_parquet(f'batch_stream_music_data_{len(full_dataset)}.parquet')
full_dataset