In [1]:
import os
import shutil
import pyspark
from pyspark.sql import SparkSession

In [2]:
# Works when master and worker are launched.
spark = SparkSession.builder \
    .appName("My Spark Application") \
    .master("spark://frank-xps:7077") \
    .getOrCreate()

23/11/05 19:35:54 WARN Utils: Your hostname, frank-xps resolves to a loopback address: 127.0.1.1; using 192.168.1.63 instead (on interface wlp0s20f3)
23/11/05 19:35:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/05 19:35:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Download dataset from: https://drive.google.com/file/d/1kCXnIeoPT6p9kS_ANJ0mmpxlfDwK1yio/view
path_dataset = '/media/frank/My Passport/datalab/datasets/big_dataset'
products_table = spark.read.parquet(path_dataset + "/products_parquet")
products_table.count()


                                                                                

75000000

In [10]:
products_table.sample(0.1).count()

7497262

In [28]:
class DataframeReducer:
    '''
    Python2 complying
    * This class will have a bad behavior if it is used in a concurrent way over the same table.
    '''
    REDUCED_SUFFIX = "reduced"
    ORIGINAL_SUFFIX = "original"

    def __init__(self, base_dir, format_file='orc', number_partition_output=1, hdfs=True):
        self.base_dir = base_dir
        self.format_file = format_file
        self.hdfs = hdfs
        self.number_partition_output = number_partition_output

    def sample_with_replacement(self, table_name, percentage):
        '''
        Sample original table and put it into working path (without suffix)
        '''
        is_original_table_present = self.check_original_dataframe_exists(table_name)
        if not is_original_table_present:
            self.move_from_current_to_original(table_name)
        print("***** Sampling... *****")
        self.sample(table_name, percentage)
        print("***** Putting reduced table into working table... ***** ")
        self.move_from_reduced_to_current(table_name)

    def sample(self, table_name, percentage):
        try:
            if percentage <= 0 or percentage >=1:
                raise AssertionError("Percentage must be a float between excluded 0 and 100.")
            input_path = os.path.join(self.base_dir, "{}_{}".format(table_name, DataframeReducer.ORIGINAL_SUFFIX))
            output_path = os.path.join(self.base_dir, "{}_{}".format(table_name, DataframeReducer.REDUCED_SUFFIX))
            df_input = spark.read.format(self.format_file).load(input_path)
            print("Total number of rows: {}".format(df_input.count()))
            print("Dataframe is written at : {}".format(output_path))
            print("Dataframe output format is : {}".format(self.format_file))
            df_output = df_input.sample(fraction=percentage/100, withReplacement=False, seed=None)
            print("Reduced table partitions: {}".format(self.number_partition_output))
            print("Reduced table number of rows: {}".format(df_output.count()))
            df_output.coalesce(self.number_partition_output).write.format(self.format_file).mode('overwrite').save(output_path)
        except Exception as e:
            print(e)

    def reset(self, table_name):
        is_original_table_present = self.check_original_dataframe_exists(table_name)
        if is_original_table_present:
            self.move_from_original_to_current(table_name)
        self.remove_dataframe("{}_{}".format(table_name, DataframeReducer.REDUCED_SUFFIX))

    def move_from_original_to_current(self, table_name):
        self.remove_dataframe(table_name)
        self.move_dataframe("{}_{}".format(table_name, DataframeReducer.ORIGINAL_SUFFIX), table_name)

    def move_from_reduced_to_current(self, table_name):
        self.remove_dataframe(table_name)
        self.move_dataframe("{}_{}".format(table_name, DataframeReducer.REDUCED_SUFFIX), table_name)

    def move_from_current_to_original(self, table_name):
        self.remove_dataframe("{}_{}".format(table_name, DataframeReducer.ORIGINAL_SUFFIX))
        self.move_dataframe(table_name, "{}_{}".format(table_name, DataframeReducer.ORIGINAL_SUFFIX))

    def check_original_dataframe_exists(self, table_name):
        original_dataframe_path = os.path.join(self.base_dir, "{}_{}".format(table_name, DataframeReducer.ORIGINAL_SUFFIX))
        is_original_table_present = self.check_dataframe_exists(original_dataframe_path)
        return is_original_table_present

    def check_dataframe_exists(self, table_name):
        if not self.hdfs:
            table_path = os.path.join(self.base_dir, table_name)
            is_table_present = os.path.isdir(table_path)
            return is_table_present

    def remove_dataframe(self, table_name):
        if not self.hdfs:
            is_table_present = self.check_dataframe_exists(table_name)
            if is_table_present:
                dataframe_path = os.path.join(self.base_dir, table_name)            
                try:
                    shutil.rmtree(dataframe_path)
                except OSError as e:
                    print("Error: %s : %s" % (dataframe_path, e.strerror))
    
    def move_dataframe(self, table_name_src, table_name_dst):
        source_path = os.path.join(self.base_dir, table_name_src)
        destination_path = os.path.join(self.base_dir, table_name_dst)

        if not self.hdfs:
            shutil.move(source_path, destination_path)
    
    def copy_dataframe(self, table_name_src, table_name_dst):
        source_path = os.path.join(self.base_dir, table_name_src)
        destination_path = os.path.join(self.base_dir, table_name_dst)
        if not self.hdfs:
            shutil.copy(source_path, destination_path)
        

In [38]:
output = "./data"
format_file = "parquet"
redu = DataframeReducer(output, format_file=format_file, hdfs=False)

In [39]:
redu.sample_with_replacement("products_parquet", 0.1)

***** Sampling... *****
Total number of rows: 75000000
Dataframe is written at : /home/frank/Code/ComputerScience-Data/Data/DataEngineering/spark/pyspark/data/products_parquet_reduced
Dataframe output format is : parquet
Reduced table partitions: 1
Reduced table number of rows: 75214
***** Putting reduced table into working table... ***** 


In [47]:
redu.reset("products_parquet")

In [48]:
products_table = spark.read.parquet("./data/products_parquet")
products_table.count()

75000000

In [49]:
try:
    products_table_original = spark.read.parquet("./data/products_parquet_original")
    print(products_table_original.count())
except Exception as e:
    print(e)

Path does not exist: file:/home/frank/Code/ComputerScience-Data/Data/DataEngineering/spark/pyspark/data/products_parquet_original;


In [50]:
try:
    products_table_reduced = spark.read.parquet("./data/products_parquet_reduced")
    print(products_table_reduced.count())
except Exception as e:
    print(e)

Path does not exist: file:/home/frank/Code/ComputerScience-Data/Data/DataEngineering/spark/pyspark/data/products_parquet_reduced;
