In [13]:
# %pip install pandas python-snappy lz4 bz2file

#%pip install zlib-state

#%pip install pyarrow

%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m28.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=14eb7b0d0327d3154c9b2abd1f114485b259a5fc50985b5c16b559c7842cae37
  Stored in directory: /home/sagemaker-user/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected package

In [1]:
import boto3
import pandas as pd
# Create S3 session and bucket
s3_session = boto3.Session().client("s3")
bucket = 'team-3-project-data-atif'

In [11]:
import boto3
import pandas as pd
import io
import gzip
import snappy
import lz4.frame
import bz2
import zlib
from pyarrow import csv, orc, parquet
import json

def read_csv_from_s3(bucket_name, file_key):
    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=bucket_name, Key=file_key)
    csv_content = response['Body'].read()
    return csv_content

def compress_to_gzip(content):
    with io.BytesIO() as bio:
        with gzip.GzipFile(filename='output.gz', mode='wb', fileobj=bio) as gz_file:
            gz_file.write(content)
        return bio.getvalue()

def compress_to_snappy(content):
    return snappy.compress(content)

def compress_to_lz4(content):
    return lz4.frame.compress(content)

def compress_to_bz2(content):
    return bz2.compress(content)

def compress_to_deflate(content):
    return zlib.compress(content, zlib.Z_BEST_COMPRESSION)

def write_compressed_to_s3(bucket_name, file_key, content, compression_format):
    s3 = boto3.client('s3')
    compressed_file_key = f"{file_key}.{compression_format}"
    s3.put_object(Bucket=bucket_name, Key=compressed_file_key, Body=content)


def preprocess_csv(content):
    # Decode bytes to string
    content_str = content.decode('utf-8')
    # Replace 'NULL' values with empty strings
    return content_str.replace('NULL', '')

def convert_to_orc(content):
    table = csv.read_csv(io.BytesIO(content))
    with io.BytesIO() as bio:
        orc.write_table(table, bio)
        return bio.getvalue()

def convert_to_parquet(content):
    table = csv.read_csv(io.BytesIO(content))
    with io.BytesIO() as bio:
        parquet.write_table(table, bio)
        return bio.getvalue()

def convert_to_json(content):
    df = pd.read_csv(io.BytesIO(content))
    return df.to_json(orient='records')


bucket_name = 'team-3-project-data-atif'
file_key = 'Redshift-ingestion/NASA_competency_v5.csv'

    # Read CSV file from S3
csv_content = read_csv_from_s3(bucket_name, file_key)

# Preprocess CSV content
#csv_content = preprocess_csv(csv_content)

# Convert to other formats
#orc_content = convert_to_orc(csv_content)
#parquet_content = convert_to_parquet(csv_content)
#json_content = convert_to_json(csv_content)

    # Compress to different formats
gz_content = compress_to_gzip(csv_content)
snappy_content = compress_to_snappy(csv_content)
lz4_content = compress_to_lz4(csv_content)
bz2_content = compress_to_bz2(csv_content)
deflate_content = compress_to_deflate(csv_content)

    # Write compressed files to S3
write_compressed_to_s3(bucket_name, file_key, gz_content, 'gz')
write_compressed_to_s3(bucket_name, file_key, snappy_content, 'snappy')
write_compressed_to_s3(bucket_name, file_key, lz4_content, 'lz4')
write_compressed_to_s3(bucket_name, file_key, bz2_content, 'bz2')
write_compressed_to_s3(bucket_name, file_key, deflate_content, 'deflate')

print("Compression completed successfully.")





Compression completed successfully.
