In [1]:
import findspark
findspark.init()


In [2]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.master("spark://192.168.31.120:7077").appName("Bigdata66").getOrCreate()

In [4]:
df= spark.read.format("csv").options(header="True", inferSchema = "True").load("hdfs://192.168.31.120:9000/goldData1.csv")

In [7]:
df.show()

+---+-------------+---------+---------+--------+--------+--------------------+--------+------+-------------+
|_c0|spreadProfile|bidSpread|askSpread|     bid|     ask|                time|platform|server|           ts|
+---+-------------+---------+---------+--------+--------+--------------------+--------+------+-------------+
|  0|        Prime|     11.4|     11.3|1829.036|1829.613|2022-01-01 12:03:...|     MT5| Live1|1640984684508|
|  1|     Standard|     14.1|     14.1|1829.009|1829.641|2022-01-01 12:03:...|     MT5| Live1|1640984684508|
|  2|      Premium|     12.8|     12.7|1829.022|1829.627|2022-01-01 12:03:...|     MT5| Live1|1640984684508|
|  3|        Prime|     17.5|     17.5|1828.975|1829.675|2022-01-01 12:03:...|     MT4| Real2|1640984684522|
|  4|     Standard|     25.0|     25.0|  1828.9| 1829.75|2022-01-01 12:03:...|     MT4| Real2|1640984684522|
|  5|     Standard|      6.4|      6.3|1829.086|1829.563|2022-01-01 12:03:...|     MT4| Live2|1640984684531|
|  6|        Prime|

In [5]:
df.cache()
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- spreadProfile: string (nullable = true)
 |-- bidSpread: double (nullable = true)
 |-- askSpread: double (nullable = true)
 |-- bid: double (nullable = true)
 |-- ask: double (nullable = true)
 |-- time: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- server: string (nullable = true)
 |-- ts: long (nullable = true)



In [10]:
df.columns

['_c0',
 'spreadProfile',
 'bidSpread',
 'askSpread',
 'bid',
 'ask',
 'time',
 'platform',
 'server',
 'ts']

In [11]:
from pyspark.ml.feature import StringIndexer

inputs = ['spreadProfile', 'platform', 'server']
outputs = ['nSpreadProfile', 'nPlatform', 'nServer']
indexer = StringIndexer(inputCols=inputs, outputCols=outputs)
indexed = indexer.fit(df).transform(df)

In [12]:
for item in indexed.head(5):
    print(item)

Row(_c0=0, spreadProfile='Prime', bidSpread=11.4, askSpread=11.3, bid=1829.036, ask=1829.613, time='2022-01-01 12:03:14.711000', platform='MT5', server='Live1', ts=1640984684508, nSpreadProfile=0.0, nPlatform=2.0, nServer=0.0)
Row(_c0=1, spreadProfile='Standard', bidSpread=14.1, askSpread=14.1, bid=1829.009, ask=1829.641, time='2022-01-01 12:03:14.711000', platform='MT5', server='Live1', ts=1640984684508, nSpreadProfile=2.0, nPlatform=2.0, nServer=0.0)
Row(_c0=2, spreadProfile='Premium', bidSpread=12.8, askSpread=12.7, bid=1829.022, ask=1829.627, time='2022-01-01 12:03:14.711000', platform='MT5', server='Live1', ts=1640984684508, nSpreadProfile=1.0, nPlatform=2.0, nServer=0.0)
Row(_c0=3, spreadProfile='Prime', bidSpread=17.5, askSpread=17.5, bid=1828.975, ask=1829.675, time='2022-01-01 12:03:14.711000', platform='MT4', server='Real2', ts=1640984684522, nSpreadProfile=0.0, nPlatform=0.0, nServer=6.0)
Row(_c0=4, spreadProfile='Standard', bidSpread=25.0, askSpread=25.0, bid=1828.9, ask=18

In [13]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = [
    'bidSpread',
    'askSpread',
    'ts',
    'nSpreadProfile',
    'nPlatform',
    'nServer'], outputCol ='features')
output=assembler.transform(indexed)
output.select('features', 'bid', 'ask').show(5)

+--------------------+--------+--------+
|            features|     bid|     ask|
+--------------------+--------+--------+
|[11.4,11.3,1.6409...|1829.036|1829.613|
|[14.1,14.1,1.6409...|1829.009|1829.641|
|[12.8,12.7,1.6409...|1829.022|1829.627|
|[17.5,17.5,1.6409...|1828.975|1829.675|
|[25.0,25.0,1.6409...|  1828.9| 1829.75|
+--------------------+--------+--------+
only showing top 5 rows



In [14]:
final_data = output.select('features', 'bid', 'ask')
train_data, test_data = final_data.randomSplit([0.7, 0.3])
train_data.describe().show()

+-------+-------------------+-------------------+
|summary|                bid|                ask|
+-------+-------------------+-------------------+
|  count|             328193|             328193|
|   mean| 1829.1018999883315|  1829.605483019441|
| stddev|0.11683682143391022|0.04613097495157919|
|    min|             1828.9|           1829.508|
|    max|           1829.321|            1829.75|
+-------+-------------------+-------------------+



In [15]:
test_data.describe().show()

+-------+-------------------+-------------------+
|summary|                bid|                ask|
+-------+-------------------+-------------------+
|  count|             140671|             140671|
|   mean| 1829.1024698903714| 1829.6054160133656|
| stddev|0.11706345723559733|0.04616017140401893|
|    min|             1828.9|           1829.508|
|    max|           1829.321|            1829.75|
+-------+-------------------+-------------------+



In [16]:
from pyspark.ml.regression import LinearRegression

bid_lr = LinearRegression(featuresCol = 'features', labelCol = 'bid')
ask_lr = LinearRegression(featuresCol = 'features', labelCol = 'ask')
train_bid_model = bid_lr.fit(train_data)
train_ask_model = ask_lr.fit(train_data)
bid_results = train_bid_model.evaluate(train_data)
ask_results = train_ask_model.evaluate(train_data)

print("Rsquared bid error: ", bid_results.r2)
print("Rsquared ask error: ", ask_results.r2)

Rsquared bid error:  0.9829367014809114
Rsquared ask error:  0.9566633067640856


In [17]:
bid_test_data = test_data.select('features', 'bid')
ask_test_data = test_data.select('features', 'ask')



In [18]:
bid_prediction = train_bid_model.transform(bid_test_data)
bid_prediction.show(1)

+--------------------+--------+----------------+
|            features|     bid|      prediction|
+--------------------+--------+----------------+
|[3.9,3.8,1.640984...|1829.111|1829.10707740117|
+--------------------+--------+----------------+
only showing top 1 row



In [19]:
ask_prediction = train_ask_model.transform(ask_test_data)
ask_prediction.show(1)

+--------------------+--------+------------------+
|            features|     ask|        prediction|
+--------------------+--------+------------------+
|[3.9,3.8,1.640984...|1829.538|1829.5317037398004|
+--------------------+--------+------------------+
only showing top 1 row



In [6]:
df.createOrReplaceTempView("goldPrice")

In [21]:
spark.sql("""SELECT * from goldPrice""").show(10)

+---+-------------+---------+---------+--------+--------+--------------------+--------+------+-------------+
|_c0|spreadProfile|bidSpread|askSpread|     bid|     ask|                time|platform|server|           ts|
+---+-------------+---------+---------+--------+--------+--------------------+--------+------+-------------+
|  0|        Prime|     11.4|     11.3|1829.036|1829.613|2022-01-01 12:03:...|     MT5| Live1|1640984684508|
|  1|     Standard|     14.1|     14.1|1829.009|1829.641|2022-01-01 12:03:...|     MT5| Live1|1640984684508|
|  2|      Premium|     12.8|     12.7|1829.022|1829.627|2022-01-01 12:03:...|     MT5| Live1|1640984684508|
|  3|        Prime|     17.5|     17.5|1828.975|1829.675|2022-01-01 12:03:...|     MT4| Real2|1640984684522|
|  4|     Standard|     25.0|     25.0|  1828.9| 1829.75|2022-01-01 12:03:...|     MT4| Real2|1640984684522|
|  5|     Standard|      6.4|      6.3|1829.086|1829.563|2022-01-01 12:03:...|     MT4| Live2|1640984684531|
|  6|        Prime|

In [10]:
spark.sql("""select count(*) from goldPrice""").show()

+--------+
|count(1)|
+--------+
|  468864|
+--------+



In [10]:
spark.sql("""select _c0,spreadProfile, bidSpread, askSpread, bid, ask, time from goldPrice order by bid desc""").show(5)

+------+-------------+---------+---------+--------+--------+--------------------+
|   _c0|spreadProfile|bidSpread|askSpread|     bid|     ask|                time|
+------+-------------+---------+---------+--------+--------+--------------------+
|286518|        Prime|      3.9|      3.8|1829.321|1829.508|2022-01-02 02:01:...|
|286638|        Prime|      3.9|      3.8|1829.321|1829.508|2022-01-02 02:01:...|
|286542|        Prime|      3.9|      3.8|1829.321|1829.508|2022-01-02 02:01:...|
|286446|        Prime|      3.9|      3.8|1829.321|1829.508|2022-01-02 02:01:...|
|286566|        Prime|      3.9|      3.8|1829.321|1829.508|2022-01-02 02:01:...|
+------+-------------+---------+---------+--------+--------+--------------------+
only showing top 5 rows



In [11]:
spark.sql("""select _c0,spreadProfile, bidSpread, askSpread, bid, ask, time from goldPrice order by ask desc""").show(5)

+---+-------------+---------+---------+------+-------+--------------------+
|_c0|spreadProfile|bidSpread|askSpread|   bid|    ask|                time|
+---+-------------+---------+---------+------+-------+--------------------+
| 52|     Standard|     25.0|     25.0|1828.9|1829.75|2022-01-01 12:03:...|
|172|     Standard|     25.0|     25.0|1828.9|1829.75|2022-01-01 12:03:...|
| 76|     Standard|     25.0|     25.0|1828.9|1829.75|2022-01-01 12:03:...|
|  4|     Standard|     25.0|     25.0|1828.9|1829.75|2022-01-01 12:03:...|
|100|     Standard|     25.0|     25.0|1828.9|1829.75|2022-01-01 12:03:...|
+---+-------------+---------+---------+------+-------+--------------------+
only showing top 5 rows



In [22]:
spark.sql("""SELECT * from goldPrice where platform= 'MT4' and server='Live2'""").show(10)

+---+-------------+---------+---------+--------+--------+--------------------+--------+------+-------------+
|_c0|spreadProfile|bidSpread|askSpread|     bid|     ask|                time|platform|server|           ts|
+---+-------------+---------+---------+--------+--------+--------------------+--------+------+-------------+
|  5|     Standard|      6.4|      6.3|1829.086|1829.563|2022-01-01 12:03:...|     MT4| Live2|1640984684531|
|  6|        Prime|      3.9|      3.8|1829.111|1829.538|2022-01-01 12:03:...|     MT4| Live2|1640984684531|
|  7|      Premium|      4.9|      4.8|1829.101|1829.548|2022-01-01 12:03:...|     MT4| Live2|1640984684531|
| 29|     Standard|      6.4|      6.3|1829.086|1829.563|2022-01-01 12:03:...|     MT4| Live2|1640984684531|
| 30|        Prime|      3.9|      3.8|1829.111|1829.538|2022-01-01 12:03:...|     MT4| Live2|1640984684531|
| 31|      Premium|      4.9|      4.8|1829.101|1829.548|2022-01-01 12:03:...|     MT4| Live2|1640984684531|
| 53|     Standard|