In [1]:
"""
python3.6 -m pip install pyspark

# Dataframe tutorial
https://www.guru99.com/pyspark-tutorial.html#3

#Tensorflow on spark training model: 
https://www.adaltas.com/en/2018/05/29/spark-tensorflow-2-3/

"""

import findspark
findspark.init()

# PySpark
import pyspark
from pyspark import SparkFiles
from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

import time

LOCAL_FILE = "/home/consciencesai/consciencesai/data_science_developer/projects_source/pyspark_python/data/corona_full_data.csv"

# Spark config
conf = SparkConf().setAppName('CoronaSpark')
sc = SparkContext(conf=conf)
sc.addFile(LOCAL_FILE)  

# Create a spark session
spark = SparkSession.builder.getOrCreate()

# Create pandas data frame and convert it to a spark data frame 
spark_file = SparkFiles.get("corona_full_data.csv")

df = spark.read.load(spark_file,
                     format="csv", inferSchema="true", header="true")
#df.set_index("date")

df.printSchema()
df.show(5, truncate = False)

root
 |-- date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- total_cases: integer (nullable = true)
 |-- total_deaths: integer (nullable = true)

+----------+-----------+---------+----------+-----------+------------+
|date      |location   |new_cases|new_deaths|total_cases|total_deaths|
+----------+-----------+---------+----------+-----------+------------+
|2019-12-31|Afghanistan|0        |0         |0          |0           |
|2020-01-01|Afghanistan|0        |0         |0          |0           |
|2020-01-02|Afghanistan|0        |0         |0          |0           |
|2020-01-03|Afghanistan|0        |0         |0          |0           |
|2020-01-04|Afghanistan|0        |0         |0          |0           |
+----------+-----------+---------+----------+-----------+------------+
only showing top 5 rows



In [6]:
start_time = time.time()

df_spain = df[df.location == "Spain"]

df_spain.printSchema()
df_spain.show(5, truncate = False)

df_spain.select()

count = 0
while count != 300:
    sql_result = df_spain.take(count)
    first_result = sql_result[0] if len(sql_result) > 0 else ["First", "Second"]
    print(first_result)
    print(type(first_result))
    print(first_result[0])
    print(first_result[1])
    print("*********************")
    count += 1

print("--- %s seconds ---" % (time.time() - start_time))

s=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_

In [5]:
df.registerTempTable("corona")

start_time = time.time()

sql_spain = """
    SELECT * from corona WHERE location="Spain"
            """

df_spain_2 = spark.sql(sql_spain)
print(type(df_spain_2))

count = 0
while count != 300:
    sql_result = df_spain_2.take(count)
    first_result = sql_result[0] if len(sql_result) > 0 else ["First", "Second"]
    print(first_result)
    print(type(first_result))
    print(first_result[0])
    print(first_result[1])
    print("*********************")
    count += 1

print("--- %s seconds ---" % (time.time() - start_time))

s=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_cases=0, total_deaths=0)
<class 'pyspark.sql.types.Row'>
2019-12-31
Spain
*********************
Row(date='2019-12-31', location='Spain', new_cases=0, new_deaths=0, total_

In [15]:
!pip install pandas
!pip install sklearn
!pip install matplotlib
!pip install Pillow

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [10]:
normal_pd = df_spain_2.toPandas()
normal_pd.head()

Unnamed: 0,date,location,new_cases,new_deaths,total_cases,total_deaths
0,2019-12-31,Spain,0,0,0,0
1,2020-01-01,Spain,0,0,0,0
2,2020-01-02,Spain,0,0,0,0
3,2020-01-03,Spain,0,0,0,0
4,2020-01-04,Spain,0,0,0,0


In [13]:
from sklearn.decomposition import PCA
normal_pd.drop(columns=["date", "location"], inplace=True)
pca_model = PCA(n_components=2)
values = pca_model.fit_transform(normal_pd)

In [17]:
# kmeans example
from pyspark.mllib.clustering import KMeans