In [1]:
import pyspark

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
spark = SparkSession.builder \
    .appName("Python Spark SQL basic example") \
    .config('spark.jar', "postgresql-42.2.14.jar") \
    .getOrCreate()



Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/30 22:48:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
jdbcDF = spark.read.format("jdbc"). \
options(
         url='jdbc:postgresql://host.docker.internal:5432/data6300', # jdbc:postgresql://<host>:<port>/<database>
         dbtable="historic_gas_info",
         user="aqidb",
         password="mypassword",
         driver='org.postgresql.Driver').\
load()

In [4]:
jdbcDF.printSchema()

root
 |-- date: date (nullable = true)
 |-- co: double (nullable = true)
 |-- no: double (nullable = true)
 |-- no2: double (nullable = true)
 |-- o3: double (nullable = true)
 |-- so2: double (nullable = true)
 |-- pm2_5: double (nullable = true)
 |-- pm10: double (nullable = true)
 |-- nh3: double (nullable = true)
 |-- aqi: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- coord_key: integer (nullable = true)



In [5]:
data = jdbcDF.select('*')

In [6]:
r= data.toPandas()
r

                                                                                

Unnamed: 0,date,co,no,no2,o3,so2,pm2_5,pm10,nh3,aqi,lon,lat,coord_key
0,2022-10-10,173.57,0.00,1.48,55.79,0.07,1.19,1.26,2.41,1.0,-102.462776,51.213890,1
1,2022-10-10,173.57,0.00,1.61,52.93,0.07,1.19,1.26,2.31,1.0,-102.462776,51.213890,1
2,2022-10-10,173.57,0.00,1.74,49.35,0.06,1.27,1.34,1.96,1.0,-102.462776,51.213890,1
3,2022-10-10,173.57,0.00,1.76,46.49,0.07,1.50,1.58,1.52,1.0,-102.462776,51.213890,1
4,2022-10-10,175.24,0.00,1.69,44.35,0.12,2.00,2.10,1.16,1.0,-102.462776,51.213890,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1199249,2023-03-10,240.33,0.20,1.80,72.96,0.72,0.96,1.01,0.32,2.0,-128.610764,54.515102,344
1199250,2023-03-10,260.35,0.11,4.20,60.08,0.59,1.37,1.47,0.33,2.0,-128.610764,54.515102,344
1199251,2023-03-10,273.70,0.00,5.74,52.21,0.86,1.71,1.87,0.44,1.0,-128.610764,54.515102,344
1199252,2023-03-10,280.38,0.00,6.43,49.35,1.16,2.10,2.31,0.51,1.0,-128.610764,54.515102,344


In [23]:
data = jdbcDF.select("co", 'no', 'no2', 'o3', 'so2', 'pm2_5', 'pm10', 'nh3', 'aqi')


In [24]:
assembler = VectorAssembler(inputCols=["co", 'no', 'no2', 'o3', 'so2', 'pm2_5', 'pm10', 'nh3'], outputCol='features')
data = assembler.transform(data)

In [25]:
#Split data into training and test set
train_data, test_data = data.randomSplit([0.7, 0.3])

rf = RandomForestClassifier(featuresCol='features', labelCol='aqi')
model = rf.fit(train_data)

23/03/30 22:59:20 WARN MemoryStore: Not enough space to cache rdd_112_0 in memory! (computed 149.9 MiB so far)
23/03/30 22:59:20 WARN BlockManager: Persisting block rdd_112_0 to disk instead.
                                                                                

In [26]:
predictions = model.transform(test_data)

In [27]:
#Evaluate the performance of the model

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='aqi', metricName='accuracy')

# Add additional evaluation metrics
evaluator = evaluator.setMetricName('weightedPrecision').setMetricName('weightedRecall').setMetricName('f1')

# Calculate the evaluation metrics on the test set
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
weightedPrecision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
weightedRecall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print("Accuracy = %g" % accuracy)
print("Weighted Precision = %g" % weightedPrecision)
print("Weighted Recall = %g" % weightedRecall)
print("F1 Score = %g" % f1)

[Stage 50:>                                                         (0 + 1) / 1]

Accuracy = 0.892804
Weighted Precision = 0.897669
Weighted Recall = 0.892804
F1 Score = 0.891948


                                                                                

In [28]:
predictions.select("co", 'no', 'no2', 'o3', 'so2', 'pm2_5', 'pm10', 'nh3', 'aqi','prediction')

DataFrame[co: double, no: double, no2: double, o3: double, so2: double, pm2_5: double, pm10: double, nh3: double, aqi: double, prediction: double]

In [29]:
predictions.select("co", 'no', 'no2', 'o3', 'so2', 'pm2_5', 'pm10', 'nh3', 'aqi','prediction').write.format("jdbc") \
        .options(
         url='jdbc:postgresql://localhost:5432/data6300', # jdbc:postgresql://<host>:<port>/<database>
         dbtable='forecast',
         user='postgres',
         password='bigdata',
         driver='org.postgresql.Driver').mode("append").save()

Py4JJavaError: An error occurred while calling o937.save.
: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:342)
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:54)
	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:263)
	at org.postgresql.Driver.makeConnection(Driver.java:443)
	at org.postgresql.Driver.connect(Driver.java:297)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.create(ConnectionProvider.scala:68)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at org.postgresql.core.PGStream.createSocket(PGStream.java:243)
	at org.postgresql.core.PGStream.<init>(PGStream.java:98)
	at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:132)
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:258)
	... 49 more


23/03/31 18:17:44 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1660137 ms exceeds timeout 120000 ms
23/03/31 18:17:44 WARN SparkContext: Killing executors is not supported by current scheduler.


In [38]:
predictions.select("co", 'no', 'no2', 'o3', 'so2', 'pm2_5', 'pm10', 'nh3', 'aqi','prediction').write.format("jdbc") \
.option("url","jdbc:postgresql://localhost:5432/data6300") \
.option("driver","org.postgresql.Driver").option("dbtable", "forecast") \
.option("user", "postgres").option("password", "bigdata").mode("append").save()