In [1]:
from minio import Minio
from minio.error import S3Error
import io
import pyarrow as pa
import pyarrow.parquet as pq
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

In [2]:
#conn_minio
from conn_minio import *
minioClient = connect_minio()

#try catch list buckets
try: 
    buckets = minioClient.list_buckets()
    for bucket in buckets:
        print(bucket.name, bucket.creation_date)
except S3Error as err:
    print(err)


bronze 2024-09-16 15:51:21.440000+00:00


In [3]:
# #conn_mongo
# from conn_mongodb import connect_mongodb
# mongo_url = 'mongodb://127.0.0.1/'
# spark = connect_mongodb()

# bronze_df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
# .option('spark.mongodb.input.uri', mongo_url) \
# .option('spark.mongodb.input.database', 'yelp_dataset') \
# .option('spark.mongodb.input.collection', 'review') \
# .load()

# bronze_df.head(1)

In [4]:
collections = ['business', 'checkin', 'tip', 'user', 'review']
from conn_mongodb import connect_mongodb
mongo_url = 'mongodb://127.0.0.1/'
spark = connect_mongodb()

for collection in collections:
    try:
        print(f"Processing collection: {collection}")
        bronze_df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
            .option('spark.mongodb.input.uri', mongo_url) \
            .option('spark.mongodb.input.database', 'yelp_dataset') \
            .option('spark.mongodb.input.collection', collection) \
            .load()
        bronze_df.repartition(1).write.format("parquet").mode("append").save(f'parquet/{collection}')
        print(f"Successfully processed collection: {collection}")
    except Exception as e:
        print(f"Error processing collection {collection}: {e}")


Processing collection: business
Successfully processed collection: business
Processing collection: checkin
Successfully processed collection: checkin
Processing collection: tip
Successfully processed collection: tip
Processing collection: user
Successfully processed collection: user
Processing collection: review
Successfully processed collection: review


In [10]:
#rename parquet files
import os
import shutil
for collection in collections:
    try:
        for file_name in os.listdir(f'parquet/{collection}'):
            if file_name.endswith(".parquet"):
                print(f"Processing collection: {collection}")
                os.rename(f'parquet/{collection}/{file_name}', f'parquet/{collection}/{collection}.parquet')
                print(f"Successfully renamed collection: {collection}")
    except Exception as e:
            print(f"Error renaming collection {collection}: {e}")

Processing collection: business
Successfully renamed collection: business
Processing collection: checkin
Successfully renamed collection: checkin
Processing collection: tip
Successfully renamed collection: tip
Processing collection: user
Successfully renamed collection: user
Processing collection: review
Successfully renamed collection: review


In [11]:
#push to minio
bucket_name = 'bronze'

for collection in collections:
    try:
        file = f'parquet/{collection}/{collection}.parquet'
        obj = f'{collection}.parquet'
        minioClient.fput_object(bucket_name, obj, file)
        print(f"Successfully pushed collection: {collection} to minio")
    except Exception as e:
        print(f"Error pushing collection {collection} to minio: {e}")


Successfully pushed collection: business to minio
Successfully pushed collection: checkin to minio
Successfully pushed collection: tip to minio
Successfully pushed collection: user to minio
Successfully pushed collection: review to minio
