In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from google.cloud import storage
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName('Nifty50').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '30g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','30g'), ("spark.driver.maxResultSize", "70g")])

gcs_client = storage.Client()

In [3]:
bucket_name = 'bigdata_nifty50'

In [4]:
bucket = gcs_client.bucket(bucket_name)

In [5]:
sector_file = list(bucket.list_blobs(prefix='NIFTY50_Kaggle/ind'))[0]

In [6]:
sector_df = spark.read.csv('gs://{}//{}'.format(bucket_name, sector_file.name), header=True).toPandas()

                                                                                

In [7]:
other_ind_sector = {
    'AUROPHARMA': 'Healthcare',
    'HINDPETRO': 'Oil Gas & Consumable Fuels',
    'IGL': 'Oil Gas & Consumable Fuels',
    'JINDALSTEL': 'Metals & Mining',
    'JUBLFOOD': 'Fast Moving Consumer Goods',
    'LUPIN': 'Healthcare',
    'MM': 'Automobile and Auto Components',
    'NIFTY 50': 'NIFTY 50',
    'NIFTY BANK': 'NIFTY BANK',
    'NMDC': 'Metals & Mining',
    'PEL': 'Healthcare',
    'PNB': 'Financial Services',
    'SAIL': 'Metals & Mining',
    'YESBANK': 'Financial Services'
}

In [8]:
dfs = []
for file in list(bucket.list_blobs(prefix='NIFTY50_Kaggle/')):
    if 'csv' in file.name and 'ind_nifty100list' not in file.name:
        company_ticker = file.name.split('/')[1].split('_')[0]
        try:
            sector = list(sector_df.loc[sector_df['Symbol']==company_ticker]['Industry'])[0]
        except IndexError:
            sector = other_ind_sector[company_ticker]
            
        temp_df = spark.read.csv('gs://{}/{}'.format(bucket_name,file.name), inferSchema=True, header=True)
        temp_df = temp_df.withColumn('sector', lit(sector)).withColumn('company', lit(company_ticker))
        dfs.append(temp_df)

                                                                                

In [9]:
df = reduce(DataFrame.unionAll, dfs)

In [10]:
from datetime import datetime
from pyspark.sql.types import TimestampType
def convert_str_to_date(string):
    date = string.split('+')[0]
    return datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
udf_str_to_date = udf(convert_str_to_date, TimestampType())   

In [11]:
df = df.withColumn('date', udf_str_to_date(col('date')))

In [12]:
df.show(1)

22/11/19 16:38:02 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/11/19 16:38:09 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.3 MiB
[Stage 204:>                                                        (0 + 1) / 1]

+-------------------+------+------+------+------+------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+-----------------+----------------+----------------+------------------+-------+-----------------+------------------+------------------+------------------+------------------+-----------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-----------------+------------------+-----+-------------------+--------------------+-------------------+--------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------+-----------------+----------------+------------------+------------------+------------------+-------

                                                                                

In [14]:
blob = bucket.blob('combined.csv')

In [16]:
combined_file = 'combined.csv'

In [17]:
df.write.csv('gs://{}/{}'.format(bucket_name, combined_file))

22/11/19 16:40:30 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                