In [1]:
import pandas as pd
import numpy as np
import random
import seaborn as sns
import matplotlib.pyplot as plt 
import itertools
from pandas.plotting import scatter_matrix

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, ConfusionMatrixDisplay, plot_confusion_matrix

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,isnan, when, count, mean as _mean, stddev as _stddev, col
from pyspark.sql.types import DoubleType, IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, VectorIndexer, IndexToString, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


ModuleNotFoundError: No module named 'pyspark'

In [None]:
spark = SparkSession.builder \
                .appName('TestMinadzb') \
                .config("spark.driver.bindAddress", "127.0.0.1") \
                .getOrCreate()

In [None]:
csv = spark.read.format("csv").option("header","true").load("dataset.csv")

In [None]:
csv.head()

In [None]:
csv.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in csv.columns]
   ).show()

In [None]:
csv = csv.drop('url')
csv = csv.drop('cid')
csv = csv.drop('id')
csv = csv.drop('Lng')
csv = csv.drop('Lat')
csv = csv.drop('DOM')
csv = csv.drop('tradeTime')
csv = csv.drop('price')
csv = csv.drop('floor') #(chinese characters)
csv = csv.drop('buildingType') #buildingType
csv = csv.drop('communityAverage') #communityAverage
csv = csv.drop('constructionTime') #constructionTime (chinese characters)

In [None]:
csv = csv.dropna()

In [None]:
csv.describe()

In [None]:
csv.printSchema()

In [None]:
csv.show()

In [None]:


to_int = ['followers', 'livingRoom', 'drawingRoom', 'kitchen',
          'bathRoom', 'renovationCondition', 'buildingStructure', 
          'elevator', 'fiveYearsProperty', 'subway', 'district']
to_double = ['square', 'ladderRatio', 'totalPrice']

for i in to_int:
    csv = csv.withColumn(i,col(i).cast(IntegerType()))
    #csv = csv.select(col(i).cast('int').alias(i))
    
for i in to_double:
    csv = csv.withColumn(i,col(i).cast(DoubleType()))
    #csv = csv.select(col(i).cast('double').alias(i))

In [None]:
#do kategoryzacji podzial ceny na duze i male
df_stats = csv.select(
    _mean(col('totalPrice')).alias('mean'),
    _stddev(col('totalPrice')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']
mean

In [None]:
#do kategoryzacji podzial ceny na duze i male
csv2 = csv.withColumn("totalPrice", when(csv.totalPrice >= 350,1) 
#                                     .when(csv.totalPrice >= 500,2) 
#                                     .when(csv.totalPrice >= 350,1) 
                                    .otherwise(0))
csv2.show(15,False)

In [None]:
# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=csv.columns, outputCol=vector_col)
df_vector = assembler.transform(csv).select(vector_col)

# get correlation matrix
corr = Correlation.corr(df_vector, vector_col)
# corr.style.background_gradient(cmap='coolwarm')
# cor_np = corr.collect()[0][corr.columns[0]].toArray()
# print(str(corr).replace('nan', 'NaN'))
corr.collect()[0]["pearson({})".format(vector_col)].values

# Klasyfikacja

# przygotowanie danych

In [None]:
def get_dummy(df,categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, outputCol="features")

    pipeline = Pipeline(stages=indexers + encoders + [assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label',col(labelCol))

    return data.select('features','label')

In [None]:
#do zriobienia dla danych kategorycznych
catcols = ['renovationCondition','buildingStructure','elevator','fiveYearsProperty',
           'subway']

num_cols = ['followers','square','livingRoom','drawingRoom','kitchen','bathRoom','ladderRatio','district']
labelCol = 'totalPrice'

data = get_dummy(csv2,catcols,num_cols,labelCol)
data.show(5)

In [None]:

# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(data)
labelIndexer.transform(data).show(5, True)

In [None]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 6 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
                                  outputCol="indexedFeatures", \
                                  maxCategories=5).fit(data)
featureIndexer.transform(data).show(6, True)

In [None]:
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])

trainingData.show(5,False)
testData.show(5,False)

# Regresja Logistyczna

In [None]:
# Logistic regression model
logr = LogisticRegression(featuresCol='indexedFeatures', labelCol='indexedLabel')

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, logr,labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [None]:
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
lrModel = model.stages[2]
trainingSummary = lrModel.summary

print(trainingSummary)

# Obtain the objective per iteration
# objectiveHistory = trainingSummary.objectiveHistory
# print("objectiveHistory:")
# for objective in objectiveHistory:
#     print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show(5)
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head(5)
# bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
#     .select('threshold').head()['threshold']
# lr.setThreshold(bestThreshold)



x = trainingSummary.roc.select("FPR")
x = x.toPandas()
y = trainingSummary.roc.select("TPR")
y = y.toPandas()

fig, ax = plt.subplots(figsize=(10,10))
ax.set_title("ROC")
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.plot(x,y)

# Drzewo Decyzyjne

In [None]:
# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree,labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [None]:
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

In [None]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[-2]
print(rfModel)  # summary only