In [2]:
import os
import pandas as pd
from pathlib import Path

In [20]:
def load(filename: str) -> pd.DataFrame:
    
    df_iter = pd.read_csv(filename, iterator=True, chunksize=10000)
    return df_iter

In [102]:
def fetch(df_iter: pd.DataFrame) -> pd.DataFrame:
    """Read one chunk"""
    df = next(df_iter)
    df = df[df['language'] == 'english']
    df = df.replace('\\n', '', regex = True)
    df = df.reset_index(drop=True)
    return df

In [3]:
source_path = "/home/aliciescont/Documents/Github/dezoomcamp-project/data/steam_reviews.csv"
df_iter = pd.read_csv(source_path, iterator=True, chunksize=500000, index_col=0)

df = next(iter(df_iter))


In [104]:
source_path = "/home/aliciescont/Documents/Github/dezoomcamp-project/data/steam_reviews.csv"
df_iter = pd.read_csv(source_path, iterator=True, chunksize=500000, index_col=0)

index_file = 0
while True:
    try:
        index_file += 1
        df_next = fetch(df_iter)
        print(df_next.head())
        path = Path(f"{source_path}_{index_file}.csv") 
        df_next.to_csv(path,index=False)
    except StopIteration:
        print("Finished ingesting data into the postgres database")
        break 

   app_id                  app_name  review_id language  \
0  292030  The Witcher 3: Wild Hunt   85184605  english   
1  292030  The Witcher 3: Wild Hunt   85184171  english   
2  292030  The Witcher 3: Wild Hunt   85184064  english   
3  292030  The Witcher 3: Wild Hunt   85180436  english   
4  292030  The Witcher 3: Wild Hunt   85179753  english   

                                              review  timestamp_created  \
0  One of the best RPG's of all time, worthy of a...         1611379970   
1             good story, good graphics. lots to do.         1611379264   
2                                           dis gud,         1611379091   
3  favorite game of all time cant wait for the Ne...         1611373086   
4                          Why wouldn't you get this         1611371978   

   timestamp_updated  recommended  votes_helpful  votes_funny  ...  \
0         1611379970         True              0            0  ...   
1         1611379264         True              0      

In [5]:
source_path = "/home/aliciescont/Documents/Github/dezoomcamp-project/data/steam_reviews.csv"
df_iter = pd.read_csv(source_path, iterator=True, chunksize=500000, index_col=0)
for chunk in df_iter:
    print(chunk['review_id'].head())

0    85185598
1    85185250
2    85185111
3    85184605
4    85184287
Name: review_id, dtype: int64
500001    49797168
500002    49794752
500003    49792060
500004    49789951
500005    49774359
Name: review_id, dtype: int64
1000001    71813207
1000002    71812793
1000003    71812562
1000004    71812466
1000005    71812190
Name: review_id, dtype: int64
1500001    11020694
1500002    11020664
1500003    11020579
1500004    11020489
1500005    11020478
Name: review_id, dtype: int64
2000001    46558987
2000002    46558911
2000003    46558893
2000004    46558731
2000005    46558581
Name: review_id, dtype: int64
2500001    65135149
2500002    65134915
2500003    65134596
2500004    65134520
2500005    65134436
Name: review_id, dtype: int64
3000001    13749986
3000002    13749898
3000003    13749755
3000004    13749409
3000005    13749401
Name: review_id, dtype: int64
3500001    71410139
3500002    71404778
3500003    71403172
3500004    71401274
3500005    71393256
Name: review_id, dtype: i

In [107]:
pd.io.json.build_table_schema(df)

{'fields': [{'name': 'index', 'type': 'integer'},
  {'name': 'Unnamed: 0', 'type': 'integer'},
  {'name': 'app_id', 'type': 'integer'},
  {'name': 'app_name', 'type': 'string'},
  {'name': 'review_id', 'type': 'integer'},
  {'name': 'language', 'type': 'string'},
  {'name': 'review', 'type': 'string'},
  {'name': 'timestamp_created', 'type': 'integer'},
  {'name': 'timestamp_updated', 'type': 'integer'},
  {'name': 'recommended', 'type': 'boolean'},
  {'name': 'votes_helpful', 'type': 'integer'},
  {'name': 'votes_funny', 'type': 'integer'},
  {'name': 'weighted_vote_score', 'type': 'number'},
  {'name': 'comment_count', 'type': 'integer'},
  {'name': 'steam_purchase', 'type': 'boolean'},
  {'name': 'received_for_free', 'type': 'boolean'},
  {'name': 'written_during_early_access', 'type': 'boolean'},
  {'name': 'author.steamid', 'type': 'integer'},
  {'name': 'author.num_games_owned', 'type': 'integer'},
  {'name': 'author.num_reviews', 'type': 'integer'},
  {'name': 'author.playtime_f

In [3]:
df_iter = pd.read_csv("/home/aliciescont/Documents/Github/dezoomcamp-project/data/steam_reviews.csv", iterator=True, chunksize=10000)
    

In [52]:
df = next(df_iter)
print(df['review_id'].head())

340000    37885649
340001    37885461
340002    37885284
340003    37885230
340004    37885130
Name: review_id, dtype: int64


In [80]:
def clean(df: pd.DataFrame) -> pd.DataFrame:
    """Fix dtype issues"""

    df["timestamp_created"] = pd.to_datetime(df["timestamp_updated"], utc=True, unit='s')
    df["timestamp_updated"] = pd.to_datetime(df["timestamp_updated"], utc=True, unit='s')
    print(df.head(2))
    print(f"columns: {df.dtypes}")
    print(f"rows: {len(df)}")
    return df

In [8]:
def batch_to_parquet(df: pd.DataFrame, dataset_file: str, index_file: int) -> Path:
    """Write DataFrame out locally as parquet file"""
    path = Path(f"{dataset_file}_{index_file}.csv") 
    df.to_csv(path)
    return path

In [84]:
def etl_web_to_gcs() -> None:
    """The main ETL function"""
    
    source_path = "/home/aliciescont/Documents/Github/dezoomcamp-project/data/steam_reviews.csv"
  
    df = fetch(source_path)
    index_file = 0
    output_path = Path(f"data/steam_reviews_{index_file}.csv")
    while True:
        try:
            index_file += 1
            df_next = fetch(df_iter)
            print(df_next.head())
            path = batch_to_parquet(df, source_path, index_file)

        except StopIteration:
            print("Finished ingesting data into the postgres database")
            break 
    #df_clean = clean(df)
    return path

In [85]:
etl_web_to_gcs()

TypeError: 'str' object is not an iterator

In [26]:
    index_file = 0
    while True:
        index_file += 1
        try:   
            etl_web_to_gcs(index_file)

        except StopIteration:
            print("Finished ingesting data into the postgres database")
            break 

KeyboardInterrupt: 