In [2]:
import math

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
import numpy as np

from pyspark.sql.types import StructType, StructField, StringType,FloatType
# from pyecharts.charts import Page, Line
from sklearn.metrics import mean_squared_error
from tensorflow.python.keras.layers import LSTM, Dense
from tensorflow.python.keras.models import Sequential
import matplotlib.pyplot as plt
from datetime import datetime ,timedelta
# from pyecharts import options as opts

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
readfile = spark.read.options(header='True', inferSchema='True', delimiter=',').csv("weather.csv")
readfile.printSchema()

root
 |-- positionId: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- date_time: integer (nullable = true)
 |-- temperature: integer (nullable = true)
 |-- rain: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- windDirection: string (nullable = true)
 |-- windPower: integer (nullable = true)
 |-- fullName: string (nullable = true)
 |-- createTime: string (nullable = true)



In [3]:
readfile.show()

+----------+------+---------+-----------+----+--------+-------------+---------+-----------------------+-------------------+
|positionId|  name|date_time|temperature|rain|humidity|windDirection|windPower|               fullName|         createTime|
+----------+------+---------+-----------+----+--------+-------------+---------+-----------------------+-------------------+
|     50101|哈尔滨|       22|        -19| 0.0|      63|         西风|        2|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|
|     50101|哈尔滨|       23|        -21| 0.0|      72|       西北风|        1|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|
|     50101|哈尔滨|        0|        -20| 0.0|      65|       西南风|        1|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|
|     50101|哈尔滨|        1|        -22| 0.0|      75|         西风|        2|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|
|     50101|哈尔滨|        2|        -22| 0.0|      75|         西风|        2|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|
|     50101|哈尔滨|        3|        -24| 0.0|      79|       西南风|        1|全国-黑龙江-哈尔滨-城

In [8]:
###增加一列 就是城市
from datetime import datetime,timedelta
from pyspark.sql.functions import col,concat,count
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

def city(fullName):
    tmp=fullName.split("-")
    return tmp[1]+tmp[2]
city = udf(city, StringType())

df = readfile.withColumn("city",city(readfile.fullName))

df.show() 

+----------+------+---------+-----------+----+--------+-------------+---------+-----------------------+-------------------+------------+
|positionId|  name|date_time|temperature|rain|humidity|windDirection|windPower|               fullName|         createTime|        city|
+----------+------+---------+-----------+----+--------+-------------+---------+-----------------------+-------------------+------------+
|     50101|哈尔滨|       22|        -19| 0.0|      63|         西风|        2|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|黑龙江哈尔滨|
|     50101|哈尔滨|       23|        -21| 0.0|      72|       西北风|        1|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|黑龙江哈尔滨|
|     50101|哈尔滨|        0|        -20| 0.0|      65|       西南风|        1|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|黑龙江哈尔滨|
|     50101|哈尔滨|        1|        -22| 0.0|      75|         西风|        2|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|黑龙江哈尔滨|
|     50101|哈尔滨|        2|        -22| 0.0|      75|         西风|        2|全国-黑龙江-哈尔滨-城区|2021-12-22 22:28:24|黑龙江哈尔滨|
|     50101

In [22]:

###统计各个城市过去24小时的平均气温
df.createOrReplaceTempView("weather")
##显示前20条数据
result=spark.sql("select city,avg(temperature) as avgtemperature from weather group by city")

result.show()

# print(conn)
# print(cursor)

+------------+------------------+
|        city|    avgtemperature|
+------------+------------------+
|    浙江杭州| 9.914285714285715|
|    浙江温州|12.846666666666666|
|    辽宁朝阳|-7.222857142857142|
|    香港新界|             17.88|
|    贵州遵义|             6.984|
|    四川达州|             6.045|
|    河北沧州|1.1341176470588235|
|    吉林船营|            -17.32|
|    福建泉州|15.588571428571429|
|    云南玉溪|              9.46|
|    甘肃兰州|-4.671111111111111|
|    广东潮州|             16.76|
|    广东佛山|            17.464|
|    青海海北|              -6.6|
|    甘肃庆阳|              1.32|
|    四川绵阳| 8.381818181818181|
|    云南楚雄|              7.68|
|    吉林昌邑|            -17.32|
|    湖南永州|12.053333333333333|
|新疆可克达拉|             -8.12|
+------------+------------------+
only showing top 20 rows



In [27]:
##存储数据到 数据库表 temperature
import pymysql

# 建立数据库连接
conn = pymysql.Connect(
    host='localhost',##mysql服务器地址
    port=3306,##mysql服务器端口号
    user='root',##用户名
    passwd='MyNewPwd123!@#',##密码
    db='weather',##数据库名
    charset='utf8'##连接编码
)

# 获取游标
cursor = conn.cursor()

for item  in  result.collect():
    print(item.city,item.avgtemperature)
    sql="INSERT INTO temperature(city,avgtemperature)values (%s,%s)"

    data=(item.city,item.avgtemperature)
    cursor.execute(sql,data)
    conn.commit()

浙江杭州 9.914285714285715
浙江温州 12.846666666666666
辽宁朝阳 -7.222857142857142
香港新界 17.88
贵州遵义 6.984
四川达州 6.045
河北沧州 1.1341176470588235
吉林船营 -17.32
福建泉州 15.588571428571429
云南玉溪 9.46
甘肃兰州 -4.671111111111111
广东潮州 16.76
广东佛山 17.464
青海海北 -6.6
甘肃庆阳 1.32
四川绵阳 8.381818181818181
云南楚雄 7.68
吉林昌邑 -17.32
湖南永州 12.053333333333333
新疆可克达拉 -8.12
海南定安 18.88
山东德州 1.603076923076923
山东临沂 5.68
湖北武汉 8.605714285714285
河北雄安新区 1.1314285714285715
宁夏固原 -0.9466666666666667
江苏盐城 5.364
澳门氹仔岛 18.84
黑龙江七台河 -21.128
新疆石河子 -10.16
广东云浮 17.2
广东东莞 17.76
甘肃酒泉 -3.62
浙江绍兴 9.497142857142856
湖南常德 11.268
湖北潜江 9.28
吉林辽源 -16.24
甘肃金昌 -3.04
贵州毕节 6.471111111111111
云南西双版纳 12.44
黑龙江佳木斯 -24.0
湖北咸宁 10.754285714285714
江苏南京 8.448
安徽黄山 7.635555555555555
贵州黔西南 10.146666666666667
山西阳泉 -1.12
贵州黔东南 8.783529411764706
新疆伊犁 -7.54
青海西宁 -4.28
甘肃定西 -3.465
四川阿坝 -2.1784615384615384
辽宁抚顺 -12.293333333333333
吉林长春 -17.534545454545455
吉林磐石 -18.0
辽宁辽阳 -7.77
山东威海 3.8685714285714288
河南三门峡 5.37
江苏常州 7.68
广西百色 13.2
山西晋城 2.7266666666666666
宁夏中卫 -0.83
广西梧州 15.905
江苏镇江 7.5

In [28]:
###各个城市过去24小时的平均湿度
df.createOrReplaceTempView("weather")
##显示前20条数据
result=spark.sql("select city,avg(humidity) as avghumidity from weather group by city")
result.show()

+------------+------------------+
|        city|       avghumidity|
+------------+------------------+
|    浙江杭州|             74.76|
|    浙江温州|             73.62|
|    辽宁朝阳|40.245714285714286|
|    香港新界|             79.48|
|    贵州遵义| 77.40266666666666|
|    四川达州|             89.25|
|    河北沧州|             82.56|
|    吉林船营|              68.6|
|    福建泉州|             78.44|
|    云南玉溪|            78.628|
|    甘肃兰州| 34.90222222222222|
|    广东潮州|             93.16|
|    广东佛山|            69.672|
|    青海海北|             42.08|
|    甘肃庆阳| 35.87555555555556|
|    四川绵阳| 75.92727272727272|
|    云南楚雄|            73.576|
|    吉林昌邑|              68.6|
|    湖南永州| 79.66333333333333|
|新疆可克达拉|             85.32|
+------------+------------------+
only showing top 20 rows



In [30]:
for item  in  result.collect():
    print(item.city,item.avghumidity)
    sql="INSERT INTO humidity(city,avghumidity)values (%s,%s)"

    data=(item.city,item.avghumidity)
    cursor.execute(sql,data)
    conn.commit()

浙江杭州 74.76
浙江温州 73.62
辽宁朝阳 40.245714285714286
香港新界 79.48
贵州遵义 77.40266666666666
四川达州 89.25
河北沧州 82.56
吉林船营 68.6
福建泉州 78.44
云南玉溪 78.628
甘肃兰州 34.90222222222222
广东潮州 93.16
广东佛山 69.672
青海海北 42.08
甘肃庆阳 35.87555555555556
四川绵阳 75.92727272727272
云南楚雄 73.576
吉林昌邑 68.6
湖南永州 79.66333333333333
新疆可克达拉 85.32
海南定安 87.32
山东德州 86.36615384615385
山东临沂 86.02153846153846
湖北武汉 77.66857142857143
河北雄安新区 67.10857142857142
宁夏固原 35.92666666666667
江苏盐城 91.504
澳门氹仔岛 74.56
黑龙江七台河 60.864
新疆石河子 80.66666666666667
广东云浮 83.24
广东东莞 76.16
甘肃酒泉 26.9
浙江绍兴 82.37142857142857
湖南常德 70.432
湖北潜江 75.96
吉林辽源 64.248
甘肃金昌 30.466666666666665
贵州毕节 72.74222222222222
云南西双版纳 86.0
黑龙江佳木斯 64.07272727272728
湖北咸宁 63.70857142857143
江苏南京 68.024
安徽黄山 58.44444444444444
贵州黔西南 78.25333333333333
山西阳泉 58.744
贵州黔东南 76.30588235294118
新疆伊犁 82.40666666666667
青海西宁 29.58
甘肃定西 44.425
四川阿坝 46.84923076923077
辽宁抚顺 55.53333333333333
吉林长春 58.338181818181816
吉林磐石 65.68
辽宁辽阳 40.15
山东威海 62.182857142857145
河南三门峡 40.44
江苏常州 77.94285714285714
广西百色 80.07076923076923
山西

In [31]:
###各个城市过去24小时的平均降雨量
df.createOrReplaceTempView("weather")
##显示前20条数据
result=spark.sql("select city,avg(rain) as avgrain from weather group by city")
result.show()

+------------+--------------------+
|        city|             avgrain|
+------------+--------------------+
|    浙江杭州|                 0.0|
|    浙江温州|3.333333333333334E-4|
|    辽宁朝阳|                 0.0|
|    香港新界|                 0.0|
|    贵州遵义|                 0.0|
|    四川达州|0.001500000000000...|
|    河北沧州|                 0.0|
|    吉林船营|                 0.0|
|    福建泉州| 0.02457142857142857|
|    云南玉溪|              8.0E-4|
|    甘肃兰州|                 0.0|
|    广东潮州|               0.034|
|    广东佛山|                 0.0|
|    青海海北|                 0.0|
|    甘肃庆阳|4.444444444444444...|
|    四川绵阳|                 0.0|
|    云南楚雄|              0.0148|
|    吉林昌邑|                 0.0|
|    湖南永州|                 0.0|
|新疆可克达拉|                 0.0|
+------------+--------------------+
only showing top 20 rows



In [32]:
for item  in  result.collect():
    print(item.city,item.avgrain)
    sql="INSERT INTO rain(city,avgrain)values (%s,%s)"

    data=(item.city,item.avgrain)
    cursor.execute(sql,data)
    conn.commit()

浙江杭州 0.0
浙江温州 0.0003333333333333334
辽宁朝阳 0.0
香港新界 0.0
贵州遵义 0.0
四川达州 0.0015000000000000002
河北沧州 0.0
吉林船营 0.0
福建泉州 0.02457142857142857
云南玉溪 0.0008
甘肃兰州 0.0
广东潮州 0.034
广东佛山 0.0
青海海北 0.0
甘肃庆阳 0.00044444444444444447
四川绵阳 0.0
云南楚雄 0.0148
吉林昌邑 0.0
湖南永州 0.0
新疆可克达拉 0.0
海南定安 0.0
山东德州 0.0
山东临沂 0.0
湖北武汉 0.0
河北雄安新区 0.0
宁夏固原 0.0
江苏盐城 0.0
澳门氹仔岛 0.0
黑龙江七台河 0.0
新疆石河子 0.0
广东云浮 0.0006666666666666668
广东东莞 0.0
甘肃酒泉 0.0
浙江绍兴 0.0
湖南常德 0.0
湖北潜江 0.0
吉林辽源 0.0
甘肃金昌 0.0
贵州毕节 0.0013333333333333335
云南西双版纳 0.0
黑龙江佳木斯 0.0029090909090909093
湖北咸宁 0.0
江苏南京 0.0
安徽黄山 0.00044444444444444447
贵州黔西南 0.0
山西阳泉 0.0
贵州黔东南 0.0
新疆伊犁 0.0013333333333333335
青海西宁 0.0
甘肃定西 0.0
四川阿坝 0.0006153846153846154
辽宁抚顺 0.0
吉林长春 0.00036363636363636367
吉林磐石 0.0
辽宁辽阳 0.0
山东威海 0.0
河南三门峡 0.0
江苏常州 0.0
广西百色 0.0
山西晋城 0.0
宁夏中卫 0.0
广西梧州 0.0
江苏镇江 0.0
河南新乡 0.0
安徽亳州 0.0
湖南衡阳 0.0
台湾台中 0.0
四川凉山 0.0
广东揭阳 0.01
广西玉林 0.0
吉林城区 0.0
山东潍坊 0.0
广东江门 0.0
湖北恩施 0.0005
云南曲靖 0.0
广西贺州 0.0
新疆五家渠 0.0
宁夏银川 0.0
浙江宁波 0.0003333333333333334
江西九江 0.0005
广东河源 0.0
海南白沙 0.0
陕西商洛 0.0
辽宁锦州 

In [34]:
###全国平均气温最高的前20座城市
df.createOrReplaceTempView("weather")
##显示前20条数据
result=spark.sql("select city, avg(temperature) as avgtemperature from weather group by city order by avgtemperature desc limit 20")
result.show()

+----------+------------------+
|      city|    avgtemperature|
+----------+------------------+
|  海南保亭|             22.44|
|  海南陵水|             22.44|
|  台湾高雄|             21.44|
|  海南三亚|            20.704|
|  海南万宁|             20.52|
|  海南三沙|20.333333333333332|
|  台湾台中|             20.24|
|  海南乐东|             20.16|
|海南五指山|             20.16|
|  海南昌江|             20.16|
|  海南琼海|             20.12|
|  海南东方|             20.08|
|  海南临高|             19.36|
|  海南澄迈|             19.36|
|  广东茂名|19.354285714285716|
|  海南文昌|             19.24|
|  海南屯昌|             19.12|
|  广东湛江|            18.972|
|  海南白沙|             18.96|
|  海南海口|             18.92|
+----------+------------------+



In [35]:
for item  in  result.collect():
    print(item.city,item.avgtemperature)
    sql="INSERT INTO maxNtemperature(city,avgtemperature)values (%s,%s)"

    data=(item.city,item.avgtemperature)
    cursor.execute(sql,data)
    conn.commit()

海南保亭 22.44
海南陵水 22.44
台湾高雄 21.44
海南三亚 20.704
海南万宁 20.52
海南三沙 20.333333333333332
台湾台中 20.24
海南乐东 20.16
海南五指山 20.16
海南昌江 20.16
海南琼海 20.12
海南东方 20.08
海南临高 19.36
海南澄迈 19.36
广东茂名 19.354285714285716
海南文昌 19.24
海南屯昌 19.12
广东湛江 18.972
海南白沙 18.96
海南海口 18.92


In [38]:
###全国平均气温最低的前20座城市
df.createOrReplaceTempView("weather")
##显示前20条数据
result=spark.sql("select city, avg(temperature) as avgtemperature from weather group by city order by avgtemperature ASC limit 20")
result.show()

+--------------+-------------------+
|          city|     avgtemperature|
+--------------+-------------------+
|黑龙江大兴安岭|-32.446666666666665|
|    黑龙江黑河| -30.21142857142857|
|    黑龙江伊春|-30.070588235294117|
|内蒙古呼伦贝尔| -28.88705882352941|
|    黑龙江鹤岗| -25.25777777777778|
|    黑龙江绥化| -24.64727272727273|
|  黑龙江佳木斯|              -24.0|
|黑龙江齐齐哈尔| -23.20470588235294|
|  黑龙江双鸭山|-22.244444444444444|
|  黑龙江哈尔滨|-21.225263157894737|
|  黑龙江七台河|            -21.128|
|      吉林舒兰|             -21.04|
|      吉林蛟河|             -20.96|
|    黑龙江鸡西|            -20.512|
|    黑龙江大庆|            -20.488|
|  黑龙江牡丹江|-20.363636363636363|
|  内蒙古兴安盟|            -19.616|
|      吉林白城|             -19.38|
|      吉林永吉|             -19.32|
|      吉林桦甸|             -19.32|
+--------------+-------------------+



In [39]:
for item  in  result.collect():
    print(item.city,item.avgtemperature)
    sql="INSERT INTO minNtemperature(city,avgtemperature)values (%s,%s)"

    data=(item.city,item.avgtemperature)
    cursor.execute(sql,data)
    conn.commit()

黑龙江大兴安岭 -32.446666666666665
黑龙江黑河 -30.21142857142857
黑龙江伊春 -30.070588235294117
内蒙古呼伦贝尔 -28.88705882352941
黑龙江鹤岗 -25.25777777777778
黑龙江绥化 -24.64727272727273
黑龙江佳木斯 -24.0
黑龙江齐齐哈尔 -23.20470588235294
黑龙江双鸭山 -22.244444444444444
黑龙江哈尔滨 -21.225263157894737
黑龙江七台河 -21.128
吉林舒兰 -21.04
吉林蛟河 -20.96
黑龙江鸡西 -20.512
黑龙江大庆 -20.488
黑龙江牡丹江 -20.363636363636363
内蒙古兴安盟 -19.616
吉林白城 -19.38
吉林永吉 -19.32
吉林桦甸 -19.32


In [36]:
###全国平均降雨量最多的前20座城市
df.createOrReplaceTempView("weather")
##显示前20条数据
result=spark.sql("select city,avg(rain) as avgrain from weather group by city order by avgrain desc limit 20")
result.show()

+----------+--------------------+
|      city|             avgrain|
+----------+--------------------+
|  云南保山| 0.13666666666666666|
|  云南临沧| 0.07066666666666666|
|  新疆塔城|  0.0662857142857143|
|  安徽滁州| 0.04177777777777778|
|  云南普洱|0.034666666666666665|
|  广东潮州|               0.034|
|  福建平潭|               0.032|
|  海南乐东|0.027999999999999997|
|海南五指山|0.027999999999999997|
|  福建泉州| 0.02457142857142857|
|  海南三亚|0.022400000000000003|
|  福建福州|0.016923076923076923|
|  云南德宏|               0.016|
|  云南楚雄|              0.0148|
|  广东汕头|0.014500000000000004|
|  福建漳州|0.013000000000000006|
|  青海玉树|0.012666666666666668|
|  云南大理|0.011333333333333332|
|  福建厦门|0.010857142857142859|
|  西藏那曲| 0.01076923076923077|
+----------+--------------------+



In [37]:
for item  in  result.collect():
    print(item.city,item.avgrain)
    sql="INSERT INTO maxNrain(city,avgrain)values (%s,%s)"

    data=(item.city,item.avgrain)
    cursor.execute(sql,data)
    conn.commit()

云南保山 0.13666666666666666
云南临沧 0.07066666666666666
新疆塔城 0.0662857142857143
安徽滁州 0.04177777777777778
云南普洱 0.034666666666666665
广东潮州 0.034
福建平潭 0.032
海南乐东 0.027999999999999997
海南五指山 0.027999999999999997
福建泉州 0.02457142857142857
海南三亚 0.022400000000000003
福建福州 0.016923076923076923
云南德宏 0.016
云南楚雄 0.0148
广东汕头 0.014500000000000004
福建漳州 0.013000000000000006
青海玉树 0.012666666666666668
云南大理 0.011333333333333332
福建厦门 0.010857142857142859
西藏那曲 0.01076923076923077


In [40]:
###全国平均降雨量最少的前20座城市
df.createOrReplaceTempView("weather")
##显示前20条数据
result=spark.sql("select city,avg(rain) as avgrain from weather group by city order by avgrain ASC limit 20")
result.show()

+------------+-------+
|        city|avgrain|
+------------+-------+
|    海南定安|    0.0|
|    山东临沂|    0.0|
|云南西双版纳|    0.0|
|    湖北武汉|    0.0|
|    辽宁朝阳|    0.0|
|河北雄安新区|    0.0|
|    贵州遵义|    0.0|
|    宁夏固原|    0.0|
|    吉林船营|    0.0|
|    江苏盐城|    0.0|
|    甘肃兰州|    0.0|
|  澳门氹仔岛|    0.0|
|    青海海北|    0.0|
|黑龙江七台河|    0.0|
|    吉林昌邑|    0.0|
|    甘肃金昌|    0.0|
|  新疆石河子|    0.0|
|新疆可克达拉|    0.0|
|    广东东莞|    0.0|
|    山东德州|    0.0|
+------------+-------+



In [41]:
for item  in  result.collect():
    print(item.city,item.avgrain)
    sql="INSERT INTO minNrain(city,avgrain)values (%s,%s)"

    data=(item.city,item.avgrain)
    cursor.execute(sql,data)
    conn.commit()

宁夏固原 0.0
山东临沂 0.0
河北雄安新区 0.0
贵州遵义 0.0
山东德州 0.0
辽宁朝阳 0.0
湖北武汉 0.0
浙江杭州 0.0
吉林船营 0.0
河北沧州 0.0
新疆可克达拉 0.0
甘肃兰州 0.0
广东佛山 0.0
香港新界 0.0
海南定安 0.0
青海海北 0.0
四川绵阳 0.0
吉林昌邑 0.0
湖南永州 0.0
江苏盐城 0.0
