In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark.sql.functions import input_file_name, lit, col, isnull
from pyspark.sql import functions as F
print(f"PySpark Version : {pyspark.__version__}")
import multiprocessing
print(f"Number of CPU: {multiprocessing.cpu_count()}")

PySpark Version : 3.4.1
Number of CPU: 8


In [2]:
#Create a spark Context class, with custom config
conf = SparkConf()
conf.set('spark.default.parallelism', 700)
conf.set('spark.sql.shuffle.partitions', 700)
conf.set('spark.driver.memory', '30g')
conf.set('spark.driver.cores', 8)
conf.set('spark.executor.cores', 8)
conf.set('spark.executor.memory', '30g')
sc = SparkContext.getOrCreate(conf)

23/07/28 22:22:22 WARN Utils: Your hostname, Hops-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.140 instead (on interface en0)
23/07/28 22:22:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/28 22:22:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
## Create spark session
spark = SparkSession.builder.master('local[*]').\
                config('spark.sql.debug.maxToStringFields', '100').\
                appName("ETFs Spark Airflow Docker").getOrCreate()

In [66]:

existing_schema = StructType([
    StructField("Date", StringType(), False),
    StructField("Open", FloatType(), False),
    StructField("High", FloatType(), False),
    StructField("Low", FloatType(), False),
    StructField("Close", FloatType(), False),
    StructField("Adj Close", FloatType(), False),
    StructField("Volume", FloatType(), False),
    StructField("Symbol", FloatType(), False),
    StructField("Security Name", FloatType(), False)

])

In [67]:
input_path = "../data/stocks_etfs/A.csv"
stock_df = spark.read.csv(input_path, header=True, schema=existing_schema)

In [68]:
stock_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Adj Close: float (nullable = true)
 |-- Volume: float (nullable = true)
 |-- Symbol: float (nullable = true)
 |-- Security Name: float (nullable = true)



In [78]:
meta_symbol = spark.read.csv("../data/symbols_valid_meta.csv", header=True)
symbol_mapping = meta_symbol.select("Symbol", "Security Name").rdd.collectAsMap()
symbol_name = os.path.splitext(os.path.basename(input_path))[0]


'A'

In [70]:
stock_df.show(10)

+----------+---------+---------+---------+---------+---------+---------+------+-------------+
|      Date|     Open|     High|      Low|    Close|Adj Close|   Volume|Symbol|Security Name|
+----------+---------+---------+---------+---------+---------+---------+------+-------------+
|1999-11-18|32.546494| 35.76538|28.612303|31.473534|27.068665|6.25463E7|  null|         null|
|1999-11-19| 30.71352|30.758226|28.478184|28.880543|24.838577|1.52341E7|  null|         null|
|1999-11-22|29.551144|31.473534| 28.65701|31.473534|27.068665|6577800.0|  null|         null|
|1999-11-23|30.400572|31.205294|28.612303|28.612303| 24.60788|5975600.0|  null|         null|
|1999-11-24|28.701717| 29.99821|28.612303|29.372318|25.261524|4843200.0|  null|         null|
|1999-11-26|29.238197|29.685265|29.148785|29.461731|25.338428|1729400.0|  null|         null|
|1999-11-29| 29.32761|30.355865|29.014664|30.132332|25.915169|4074700.0|  null|         null|
|1999-11-30| 30.04292| 30.71352|29.282904|30.177038|25.95361

23/07/26 01:25:58 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 9
CSV file: file:///Users/hople/working_folder/Bootcamp_practices/ML_PIPELINE_AIRFLOW_SPARK_DOCKER/Dockerize_entire_workflow/dags/data/stocks_etfs/A.csv


In [71]:
stock_df = stock_df.withColumn("Symbol", F.lit(symbol_name))
stock_df = stock_df.withColumn("Security Name", F.lit(symbol_mapping.get(symbol_name)))

In [None]:
stock_df.write.parquet

In [72]:
stock_df.show(10)

+----------+---------+---------+---------+---------+---------+---------+------+--------------------+
|      Date|     Open|     High|      Low|    Close|Adj Close|   Volume|Symbol|       Security Name|
+----------+---------+---------+---------+---------+---------+---------+------+--------------------+
|1999-11-18|32.546494| 35.76538|28.612303|31.473534|27.068665|6.25463E7|     A|Agilent Technolog...|
|1999-11-19| 30.71352|30.758226|28.478184|28.880543|24.838577|1.52341E7|     A|Agilent Technolog...|
|1999-11-22|29.551144|31.473534| 28.65701|31.473534|27.068665|6577800.0|     A|Agilent Technolog...|
|1999-11-23|30.400572|31.205294|28.612303|28.612303| 24.60788|5975600.0|     A|Agilent Technolog...|
|1999-11-24|28.701717| 29.99821|28.612303|29.372318|25.261524|4843200.0|     A|Agilent Technolog...|
|1999-11-26|29.238197|29.685265|29.148785|29.461731|25.338428|1729400.0|     A|Agilent Technolog...|
|1999-11-29| 29.32761|30.355865|29.014664|30.132332|25.915169|4074700.0|     A|Agilent Tech

In [73]:
stock_df.select("Security Name")

DataFrame[Security Name: string]

In [39]:
stock_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Adj Close: float (nullable = true)
 |-- Volume: float (nullable = true)
 |-- Symbol: string (nullable = false)
 |-- Security Name: string (nullable = false)



In [27]:
import pandas as pd
from save_parquet import save_parquet
import os

#retain features columns
features = ['Symbol', 'Security Name', 'Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']
#path to save processed dataset
path = '../data/processed_stocks_etfs/'
#read metal symbol files
metal_symbol = pd.read_csv('../data/symbols_valid_meta.csv')
metal_symbol = metal_symbol[['Symbol', 'Security Name']]
#correct some wrong spelling, coresponding to Stock file name
metal_symbol['Symbol'] = metal_symbol['Symbol'].str.replace('$', '-',regex=False)
metal_symbol['Symbol'] = metal_symbol['Symbol'].str.replace('.V', '',regex=False)
#creat mapping dictionary
symbol_mapping = metal_symbol.set_index('Symbol').to_dict()['Security Name']

def add_name(file):
    #print(symbol_mapping)
    name = os.path.splitext(os.path.basename(file))[0]
    df = pd.read_csv(file)
    df['Symbol'] = name
    sec_name = symbol_mapping[name]
    df['Security Name'] = sec_name
    #print(sec_name)
    #df.name = name
    #return df
    save_parquet(df[features], name, path)

In [33]:
from multiprocessing import cpu_count

stocks_path = '../data/stocks_etfs/'
#path = '../data/processed_stocks_etfs/'
from load_files import load_file
#list of loaded csv files will split into n_processor, for parralezation process
n_processor = cpu_count()
#get batches of data, list of list
preprocessing_list = load_file(n_processor, stocks_path, 'csv')


def data_processing():
    '''
    Takes batch number as input
    Map function add_name for every dataframe in batch number in preprocessing_list
    '''
    temp = list(map(add_name, preprocessing_list))

#data_processing()


In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import lit
from pyspark.sql import functions as F
import os
from multiprocessing import cpu_count
from load_files import load_file #function load files into batches

#Create a spark Context class, with custom config to optimize the performance
#conf.set('spark.sql.adaptive.coalescePartitions.initialPartitionNum', 24)
#conf.set('spark.sql.adaptive.coalescePartitions.parallelismFirst', 'false')
#conf.set('spark.sql.files.minPartitionNum', 1)
conf = SparkConf()
conf.set('spark.default.parallelism', 700)
conf.set('spark.sql.shuffle.partitions', 700)
#conf.set('spark.sql.files.maxPartitionBytes', '500mb')
conf.set('spark.driver.memory', '30g')
conf.set('spark.driver.cores', 8)
conf.set('spark.executor.cores', 8)
conf.set('spark.executor.memory', '30g')
sc = SparkContext.getOrCreate(conf)

## Initialize SparkSession
spark = SparkSession.builder.master('local[*]').\
                config('spark.sql.debug.maxToStringFields', '100').\
                appName("ETFs Spark Airflow Docker").getOrCreate()


#stock dir
stocks_dir = "../data/stocks_etfs"
#processed data dir
processed_stocks_dir = "../data/processed_stocks_etfs"

#Mapping dict
meta_symbol = spark.read.csv("../data/symbols_valid_meta.csv", header=True)
symbol_mapping = meta_symbol.select("Symbol", "Security Name").rdd.collectAsMap()

#Define Schema for the data
existing_schema = StructType([
    StructField("Date", StringType(), False),
    StructField("Open", FloatType(), False),
    StructField("High", FloatType(), False),
    StructField("Low", FloatType(), False),
    StructField("Close", FloatType(), False),
    StructField("Adj Close", FloatType(), False),
    StructField("Volume", FloatType(), False),
    StructField("Symbol", FloatType(), False),
    StructField("Security Name", FloatType(), False)

])

def add_sym_sec_name(input_file):
    """
    Function adds Symbol and Security Name to stock file
    """
    # Read data from CSV into the DataFrame using the existing schema
    stock_df = spark.read.csv(input_file, header=True, schema=existing_schema)

    # Get Symbol name from input file
    symbol_name = os.path.splitext(os.path.basename(input_file))[0]

    # Adding Symbol and Security Name
    stock_df = stock_df.withColumn("Symbol", F.lit(symbol_name))
    stock_df = stock_df.withColumn("Security Name", F.lit(symbol_mapping.get(symbol_name)))

    # Save the preprocessed data to a parquet file
    output_file = os.path.join(processed_stocks_dir, f"{symbol_name}_preprocessed.parquet")
    stock_df.write.parquet(output_file, mode="overwrite")


def preprocessing_data():
    '''
    Takes batch number as input
    Map function add_sym_sec_name for every dataframe in batch number in preprocessing_list
    '''
    #list of loaded csv files will split into n_processor, for parralezation process in Airflow
    n_processor = cpu_count()
    #get batches of data
    preprocessing_list = load_file(n_processor, stocks_dir, 'csv')
    #temp = list(map(add_sym_sec_name, ('../data/stocks_etfs/A.csv')))
    #print(preprocessing_list)

preprocessing_data()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/28 01:06:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [2]:
str(preprocessing_list[0])

NameError: name 'preprocessing_list' is not defined

In [24]:
spark.read.csv(str(preprocessing_list[0]), header=True, schema=existing_schema).show(10)

+----------+----------+----------+----------+----------+----------+--------+------+-------------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|Symbol|Security Name|
+----------+----------+----------+----------+----------+----------+--------+------+-------------+
|1988-02-04|       0.0|0.61728394| 0.5555556| 0.5555556|0.42403868| 38700.0|  null|         null|
|1988-02-05|       0.0|0.61728394| 0.5555556| 0.5555556|0.42403868|606300.0|  null|         null|
|1988-02-08|       0.0|0.61728394| 0.5555556| 0.5555556|0.42403868| 19000.0|  null|         null|
|1988-02-09|       0.0|0.61728394| 0.5555556| 0.5555556|0.42403868| 23100.0|  null|         null|
|1988-02-10|       0.0|0.61728394| 0.5555556| 0.5555556|0.42403868| 10000.0|  null|         null|
|1988-02-11|       0.0|0.61728394| 0.5555556| 0.5555556|0.42403868| 10900.0|  null|         null|
|1988-02-12|       0.0|0.61728394| 0.5555556| 0.5555556|0.42403868|     0.0|  null|         null|
|1988-02-16|       0

23/07/28 00:37:48 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 9
CSV file: file:///Users/hople/working_folder/ML_PIPELINE_AIRFLOW_SPARK_DOCKER/dags/data/stocks_etfs/IPAR.csv


In [1]:
from load_files import load_file
from multiprocessing import cpu_count
stocks_dir = '../data/stocks_etfs/'
n_processor = cpu_count()
#get batches of data
preprocessing_list = load_file(n_processor, stocks_dir, 'csv')

len(preprocessing_list)

8

In [12]:
import pandas as pd
pd.read_parquet('../data/processed_stocks_etfs/IPAR_preprocessed.parquet/')

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Symbol,Security Name
0,1988-02-04,0.000000,0.617284,0.555556,0.555556,0.424039,38700.0,IPAR,"Inter Parfums, Inc. - Common Stock"
1,1988-02-05,0.000000,0.617284,0.555556,0.555556,0.424039,606300.0,IPAR,"Inter Parfums, Inc. - Common Stock"
2,1988-02-08,0.000000,0.617284,0.555556,0.555556,0.424039,19000.0,IPAR,"Inter Parfums, Inc. - Common Stock"
3,1988-02-09,0.000000,0.617284,0.555556,0.555556,0.424039,23100.0,IPAR,"Inter Parfums, Inc. - Common Stock"
4,1988-02-10,0.000000,0.617284,0.555556,0.555556,0.424039,10000.0,IPAR,"Inter Parfums, Inc. - Common Stock"
...,...,...,...,...,...,...,...,...,...
8099,2020-03-26,46.139999,50.970001,46.139999,49.240002,48.900555,198400.0,IPAR,"Inter Parfums, Inc. - Common Stock"
8100,2020-03-27,46.790001,49.220001,46.180000,47.869999,47.539997,99800.0,IPAR,"Inter Parfums, Inc. - Common Stock"
8101,2020-03-30,46.910000,50.419998,46.910000,49.770000,49.770000,106000.0,IPAR,"Inter Parfums, Inc. - Common Stock"
8102,2020-03-31,48.919998,49.650002,44.509998,46.349998,46.349998,186900.0,IPAR,"Inter Parfums, Inc. - Common Stock"
