In [None]:
# Import necessary libraries
import concurrent.futures
import threading
import os
import subprocess
from datetime import datetime, timedelta
import gzip

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, lit, sum
from pyspark.sql.types import IntegerType, DateType, StringType


# Define function to catch the activities
def log_file(message):
    catching_time =datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    with open(filelog, 'a') as log_writer:
        log_writer.write(catching_time + "---" + message +"\n")
    return filelog

# Define function to create url
def create_url(year=2016, month=1, date=1, time=1):
    year = f'{int(year):04}'
    month = f'{int(month):02}'
    date = f'{int(date):02}'
    time = f'{int(time):02}'
    url = f'https://dumps.wikimedia.org/other/pageviews/{year}/{year}-{month}/pageviews-{year}{month}{date}-{time}0000.gz'
    return url


# Define function to download file from url
def download_gzfiles(gzfolder_path,url,downloaded_files_lock, downloaded_files):
   
    gzfile_name = url.split("/")[-1]
    gzfile_path = os.path.join(gzfolder_path, gzfile_name)

    # Lock to ensure thread-safety when modifying downloaded_files set
    with downloaded_files_lock:
        # Check if the file already exists in the set
        if gzfile_path in downloaded_files:
            print(f"File {gzfile_path} already exists. Skipping download.")
            return  # Skip the download if the file is in the set
    
     # Check if the file already exists
    if os.path.exists(gzfile_path):
        print(f"File {gzfile_path} already exists. Skipping download.")
        return  # Skip the download if the file exists
    
    try:
        # Download the file
        print(f'{gzfile_name} is downloading ...')
        subprocess.run(["curl", "-o", gzfile_path, url], check=True)
        with downloaded_files_lock:
            downloaded_files.add(gzfile_path)  # Mark the file as downloaded
        print(f"Download and decompression complete: {gzfile_path}")
    except subprocess.CalledProcessError as e:
        print(f"Error downloading {url}: {e}")
        log_file(f"Error downloading {url}: {e}")

# Define function to run down load urls in parallel threads
def execute_download_gzfiles(res_url, gzfolder_path,batch_size=12):
    downloaded_files = set()  # Set to track downloaded files
    downloaded_files_lock = threading.Lock()  # Lock to make the downloaded_files set thread-safe
    
    # Run in parallel
    for idx in range(0, len(res_url), batch_size):
        batch = res_url[idx: idx+batch_size]
        with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
            executor.map(lambda url: download_gzfiles(gzfolder_path, url, downloaded_files_lock, downloaded_files), batch)

# Define function to check the .gz file downloaded not fail
def is_valid_gz(gzfile_path):
    try:
        with gzip.open(gzfile_path, 'rb') as gz_check:
            gz_check.read(1)  # Try reading one byte
        return True
    except (gzip.BadGzipFile, OSError):
        return False
         
# Define function to check the .gz files downloaded not fail and enough files for 1 days (24 files correspond to 24 hours)
def checkvalid_gzfile(gzfolder_path):
    """Ensure the folder contains 24 valid .gz files, otherwise re-download."""
    # Get the list of all .gz files in the folder
    gzfile_path_list = [os.path.join(gzfolder_path, gzfile_name) 
                        for gzfile_name in os.listdir(gzfolder_path) if gzfile_name.endswith('.gz')]
    
    # Check if the folder contains exactly 24 files
    if len(gzfile_path_list) == 24:
        # Check for invalid .gz files
        invalid_files = [gzfile for gzfile in gzfile_path_list if not is_valid_gz(gzfile)]
        
        if invalid_files:
            # Remove invalid files
            for invalid_file in invalid_files:
                print(f"Removing invalid file: {invalid_file}")
                os.remove(invalid_file) 
            # Re-download the batch
            print("Re-downloading missing or invalid files...")
            execute_download_gzfiles(gzfolder_path)  # Adjust this function to handle the folder re-download
    else:
        print(f"Expected 24 files, found {len(gzfile_path_list)}. Re-downloading...")
        execute_download_gzfiles(gzfolder_path)

# Define function to read .gz file, transform to dataframe, and save as parquet file
def process_single_file(spark,gzfile_name, gzfolder_path, parquet_folder_path, pagenames):
    try:
        gzfile_path = os.path.join(gzfolder_path, gzfile_name)
        # Validate gzfile_name format
        if len(gzfile_name.strip().replace(".gz", "").split('-')) != 3:
            raise ValueError(f"Unexpected gzfile_name format: {gzfile_name}")
        _, date_str, time_str = gzfile_name.strip().replace(".gz", "").split('-')

        # Calculate date and time
        date_collection = (datetime.strptime(date_str, "%Y%m%d") - timedelta(days=1)).strftime("%Y-%m-%d")\
                            if time_str == "000000"\
                            else datetime.strptime(date_str, "%Y%m%d").strftime("%Y-%m-%d")
        time_collection = f'{time_str[:2]}:{time_str[2:4]}:{time_str[4:]}'

        output_parquetfile_path= os.path.join(parquet_folder_path, gzfile_name.replace(".gz", ".parquet"))

        # Skip processing if the Parquet file already exists
        if os.path.exists(output_parquetfile_path):
            print(f"Skipping {gzfile_name}: Parquet file already exists.")
            return

        print(f"Processing {gzfile_name}")

        # Read the .gz file into a DataFrame
        df = spark.read.text(gzfile_path)

        # Transform the DataFrame
        df_split = df.withColumn("split_value", split(df["value"], " "))
        df_transformed = df_split.select(
            df_split["split_value"].getItem(0).alias("domain"),
            df_split["split_value"].getItem(1).alias("pagename"),
            df_split["split_value"].getItem(2).cast(IntegerType()).alias("pageview"),
            df_split["split_value"].getItem(3).cast(IntegerType()).alias("other")
        ).drop("other").filter(col("pagename").isin(pagenames))

        df_final = df_transformed.groupBy("domain", "pagename") \
            .agg(sum(col("pageview")).cast(IntegerType()).alias("sumpageviewcount")) \
            .withColumn("date_collection", lit(date_collection).cast(StringType()))\
            .withColumn("time_collection", lit(time_collection).cast(StringType())) \
            .orderBy("domain", "pagename")
        # Ensure the output directory exists
        os.makedirs(parquet_folder_path, exist_ok=True)
        # Write to Parquet on the driver node
        df_final.write.parquet(output_parquetfile_path)

    except Exception as e:
        error_message = f"Error processing {gzfile_path}: {e}"
        print(error_message)
        log_file(error_message)
      
# Define function to 
def remove_gzfiles(gzfolder_path):
    for gzfile in os.listdir(gzfolder_path):
        gzfile_path = os.path.join(gzfolder_path, gzfile)
        os.remove(gzfile_path)

In [None]:
# Main logic for testing
if __name__ == "__main__":
    spark = SparkSession.builder \
    .appName("gz_file_to_parquet") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

    gzfolder_path = "your_path/gzfolder"
    parquet_folder_path="your_path/parquet/2016-02"
    pagenames = ["Google","Facebook","Amazon","Microsoft", "Apple", 'Walmart']
    filelog = r"your_path/combine_test.txt"

    for date in range(1,2):
        # define urls 
        res_url = []
        year_pattern = 2016
        month_pattern = 2
        date_pattern = date
        time_pattern = range(0,24)
        res_url.extend(create_url(year_pattern, month_pattern, date= date_pattern, time = i )for i in time_pattern)
        print(res_url)
        
        # run tasks
        execute_download_gzfiles(res_url)
        checkvalid_gzfile(gzfolder_path)
        
        for gzfile_name in os.listdir(gzfolder_path):
            if gzfile_name.endswith('.gz'):
                process_single_file(spark,gzfile_name, gzfolder_path, parquet_folder_path, pagenames)
        
        remove_gzfiles(gzfolder_path)
    spark.stop()