# ROKY V
By Simen Svenkerud

Version: 1.0
Date: 2018.12.21

## Introduction

### Purpose

### Descriptions



In [None]:
# Load packages
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import window
from pyspark.sql.functions import broadcast
import pyspark.functions as F


In [None]:
spark = SparkSession.builder.appName('ROKY_V')\
                    .config('spark.executer.memory', '12g')\
                    .config('spark.executer.cores', 6)\
                    .getOrCreate()

### Set the base variables

In [None]:
Import_Location = ''
Output = ''

Data_Start = ''
FA_Instrument_Select = ''

### Set the Column parameters
In the section below enter the column names used in the data set itself. 

In [None]:
FA_Instrument = ''
ID = ''
Quarter = ''
Sector = ''
Name = ''
Shares_Outstanding = ''
Price = ''
Exchange_Rate = ''

### Load financial data set to be processed

In [None]:
Data = spark.read.format('Parquet')\
                 .option('header', 'true')\
                 .option('inferSchema', 'true')\
                 .load(Import_Location)\
                 .filter(F.col(FA_Instrument) == FA_Instrument_Select)\
                 .repartition(F.col(ID))\
                 .select(F.col(ID),
                         F.col(Quarter),
                         F.col(Sector),
                         F.col(Name),
                         F.col(Shares_Outstanding),
                         F.col(Price),
                         F.col(Exchange_Rate)
                        )

In [None]:
delta = Window.partitionedBy(F.col(ID)).orderBy(F.col(Quarter))

### Processing of the data

In [None]:
df1 = Data.orderBy(F.col(Quarter))\
            .select('*', F.lag(Shares_Outstanding).over(delta).alias('Shares_Outstanding_Lag'),
                         F.lag(Price).over(delta).alias('Price_Lag'),
                         F.lag(Exchange_Rate).over(delta).alias('Exchange_Rate_Lag'))\
            .fillna({Shares_Outstanding : 0.0})\
            .fillna({'Shares_Outstanding_Lag' : 0.0})\
            .fillna({'Price_Lag' : 0.0})\
            .fillna({'Exchange_Rate_Lag' : 0.0})

In [None]:
df2 = df1.withColumn('Net_Transactions', F.col(Shares_Outstanding) - F.col('Shares_Outstanding_Lag'))\
         .withColumn('Balance', F.col(Shares_Outstanding) * F.col(Price))\
         .withColumn('Delta_Price', F.col(Price) - F.col('Price_Lag'))\
         .withColumn('Delta_Exchange_rate', F.col(Exchange_Rate) - F.col('Exchange_Rate_Lag'))\
         .withColumn('Mean_price', ((F.col(Price) + F.col('Price_Lag'))/2)\
         .withColumn('Mean_Exchange_rate', ((F.col(Exchange_Rate) + F.col('Exchange_Rate_Lag'))/2)\
         .select('*', F.lag('Balance').over(delta).alias('Balance_Lag'))\
         .fillna({'Balance_Lag' : 0.0 })

In [None]:
df3 = df2.withColumn('Transactions_Value', F.col(Net_Transactions) * F.col('Mean_price'))\
         .withColumn('Market_FX_Effect', (((F.col(Exchange_Rate)/F.col('Exchange_Rate_Lag'))*F.col('Balance_Lag'))+((F.col(Exchange_Rate)/F.col('Mean_Exchange_rate'))*F.col('Transaction_Value'))))\
         .withColumn('Market_Price_Effect', ((F.col(Shares_Outstanding)-F.col('Net_Transaction'))*(F.col(Price)-F.col('Price_Lag'))+(F.col('Net_Transaction')*(F.col(Price)-F.col('Mean_price')))-F.col('Market_FX_Effect')))\
         .withColumn('Other_Changes_Volumne',(F.col('Balance')-F.col('Balance_Lag')-F.col('Market_Price_Effect')-F.col('Market_FX_Effect')-F.col('Transactions_Value')))\
         .withColumn('pct_OVC', ((F.col('Balance')-F.col('Balance_Lag')-F.col('Market_Price_Effect')-F.col('Market_FX_Effect')-F.col('Transactions_Value'))/(F.col('Balance'))))\
         .drop('Shares_Outstanding_Lag',
               'Price_Lag',
               'Exchange_Rate_Lag',
               'Delta_Price',
               'Delta_Exchange_rate',
               'Mean_price',
               'Mean_Exchange_rate',
               'Balance_Lag'
              )

In [None]:
df4 = df3.filter(F.col(Quarter)>= Data_Start)\
         .groupBy([F.col(Quarter),F.col(Sector)])\
         .agg(F.sum(F.col(Shares_Outstanding)).alias('Shares_Outstanding'),
              F.sum(F.col('Net_Transaction')).alias('Net_Transaction'),
              F.sum(F.col('Balance')).alias('Balance'),
              F.mean(F.col(Price)).alias('Price'),
              F.sum(F.col('Transaction_Value')).alias('Transaction_Value'),
              F.sum(F.col('Market_FX_Effect')).alias('Market_FX_Effect'),
              F.sum(F.col('Market_Price_Effect')).alias('Market_Price_Effect'),
              F.sum(F.col('Other_Changes_Volumne')).alias('Other_Changes_Volumne'),
              F.mean(F.col('pct_OVC')).alias('pct_OVC')
             )

### Export the by ID dataset

In [None]:
df3.orderBy(F.col(ID), F.col(Quarter)).repartition(1).write.csv.(Output+FA_Instrument_Select+'By_ID_Roky_V.csv', header = 'true', mode = 'Overwrite')

### Export the by Sector dataset

In [None]:
df4.orderBy(F.col(Quarter)).repartition(1).write.csv.(Output+FA_Instrument_Select+'By_Sector_Roky_V.csv', header = 'true', mode = 'Overwrite')

# Data Visualisation

## Summary of values