<a href="https://colab.research.google.com/github/gnonname/Big-Data/blob/main/SparkMachinelearningCdata.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>




---

✅::::::

## My first Linear Regression model with PySpark
▶
---



## Installation's Step

In [None]:
# Takes some minutes
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [None]:
!pip install pyspark py4j ### Important for all you needeed

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark
import os
import findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
findspark.init()

## Some Needed librairies

In [None]:
from pyspark.sql import SparkSession
from pyspark.shell import spark

In [None]:
Session = SparkSession.builder.appName('lm').getOrCreate()


In [None]:
data = spark.read.csv("/content/ecommerce.csv",
                      inferSchema =True,header=True)

In [None]:
data.printSchema()

In [None]:
data.head()

In [None]:
data.show()

In [None]:
data.describe().show()

## Set up my Data for the fucture model

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
coef_var = ['Avg Session Length', "Time on App",'Time on Website','Length of Membership']
assembler = VectorAssembler(inputCols= coef_var,outputCol='features')

In [None]:
output = assembler.transform(data)

In [None]:
Final_Data=output.select("features", "Yearly Amount Spent")


In [None]:
MyTrain, Mytest = Final_Data.randomSplit([0.7,0.3])

In [None]:
Mytest.describe().show()

In [None]:
MyTrain.describe().show()

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
lm = LinearRegression(labelCol="Yearly Amount Spent")

In [None]:
model = lm.fit(MyTrain)

In [None]:
import pandas as pd

In [None]:
pd.DataFrame({"Coefficients":model.coefficients}, index = coef_var)

In [None]:
result =model.evaluate(Mytest)

In [None]:
result.residuals.show()

In [None]:
unlabeled_data = Mytest.select('features')

In [None]:
predictions = model.transform(unlabeled_data)

In [None]:
predictions.show()

In [None]:
print("**********  Model ************\n\n")

print("MAE:", result.meanAbsoluteError)
print("------------------------------\n\n")
print("MSE:", result.meanSquaredError)
print("------------------------------\n\n")
print("RMSE:", result.rootMeanSquaredError)
print("------------------------------\n\n")
print("R 2:", result.r2)
print("------------------------------\n\n")
print("R 2 ajusté:", result.r2adj)
print("------------------------------\n\n")

## K-Mens

In [None]:
!curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_kmeans_data.txt >> sample_kmeans_data.txt

In [None]:
!curl https://archive.ics.uci.edu/ml/machine-learning-databases/00236/seeds_dataset.txt >> seeds_dataset.txt

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
#from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("myKmensCluster").getOrCreate()

In [None]:
df = spark.read.format("libsvm").load("sample_kmeans_data.txt")

In [None]:
df.printSchema()

In [None]:
df.head(3)

In [None]:
df.show()

In [None]:
kmeans = KMeans().setK(2).setSeed(1998)
model = kmeans.fit(df)

In [None]:
pred = model.transform(df)

In [None]:
evaluation = ClusteringEvaluator()

In [None]:
silhouette = evaluation.evaluate(pred)
print(f"Silhouette with squared euclidean distance: {silhouette}")

In [None]:
centers = model.clusterCenters()
print("Cluster Centers:")
print("=================")
for center in centers:
  print(center)



---


## Random Forest with data sample_libsvm_data.txt ⛹

---



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

In [None]:
#from pyspark.sql import SparkSession
spark_S = SparkSession.builder.appName("Forest").getOrCreate()

In [None]:
df_libsvm = spark_S.read.format("libsvm").load("sample_libsvm_data.txt")

In [None]:
df_libsvm.show()

In [None]:
df_libsvm.printSchema()

In [None]:
df_libsvm.show()

## Test and Train splining step

In [None]:
(Mytrain, Mytest) = df_libsvm.randomSplit([0.75, 0.25], seed=1998)

In [None]:
print("\n Test sample :++++++++++")

Mytest.show()

#print("\n Scheme:++++++++++\n",Mytest.schema())
print("\n Train sample :++++++++++")
Mytrain.show()
#print("\n Scheme :++++++++++\n",Mytrain.schema())


## Modelisation's step

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20,seed=1998)

In [None]:
Mymodel = rf.fit(Mytrain)

In [None]:
pred = Mymodel.transform(Mytest)

In [None]:
pred.printSchema()

In [None]:
pred.select("prediction", "label", "features").show(5)

## Evaluation's step

In [None]:
eval = MulticlassClassificationEvaluator(labelCol="label", 
                                         predictionCol="prediction", metricName="accuracy")

In [None]:
acc = eval.evaluate(pred)

In [None]:
print("Test Error = %g" % (1.0 - acc))

In [None]:
Mymodel.featureImportances



---

## Boosting With Spark for ML
---




In [None]:
from pyspark.ml.classification import GBTClassifier

In [None]:
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10, seed=42)

In [None]:
model = gbt.fit(Mytrain)

In [None]:
pred = model.transform(Mytest)

In [None]:
pred.select("prediction", "label", "features").show(5)

In [None]:
eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc = eval.evaluate(pred)
print("Test Error = %g" % (1.0 - acc))

## Tree Methods with PySpark
1. Single Decision Tree
1. Random Forest
1. Gradient Boosted Tree Classifier

In [None]:
from pyspark.sql import SparkSession
spark_tree = SparkSession.builder.appName("trees").getOrCreate()

In [None]:
df_tree = spark.read.csv("/content/college.csv", inferSchema=True, header=True)

In [None]:
df_tree.printSchema()

In [None]:
df_tree.head(2)

# Formatting for Spark

In [None]:
# "label", "features"
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
df_tree.printSchema()

In [None]:
df_tree.columns

In [None]:
assembler = VectorAssembler(
    inputCols=['Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate'          
    ],
    outputCol="features"
)

In [None]:
output = assembler.transform(df_tree)

# String Variables (Private)

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
indexer = StringIndexer(inputCol="Private", outputCol="PrivateIndexer")
output_fixed = indexer.fit(output).transform(output)

In [None]:
df_final = output_fixed.select("features", "PrivateIndexer")

In [None]:
train, test = df_final.randomSplit([0.7, 0.3], seed=1998)

# Tree Classifiers

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline

## CREATE Models

In [None]:
dtc = DecisionTreeClassifier(labelCol="PrivateIndexer", featuresCol="features")
rfc = RandomForestClassifier(labelCol="PrivateIndexer", featuresCol="features")
gbt = GBTClassifier(labelCol="PrivateIndexer", featuresCol="features")

In [None]:
dtc_model = dtc.fit(train)
rfc_model = rfc.fit(train)
gbt_model = gbt.fit(train)

# Predictions

In [None]:
dtc_pred = dtc_model.transform(test)
rfc_pred = rfc_model.transform(test)
gbt_pred = gbt_model.transform(test)

# Eval

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="PrivateIndexer", predictionCol="prediction", metricName="accuracy")

In [None]:
dtc_acc = evaluator.evaluate(dtc_pred)
rfc_acc = evaluator.evaluate(rfc_pred)
gbt_acc = evaluator.evaluate(gbt_pred)

In [None]:
print("-"*10)
print(f"DT Acc: {dtc_acc}")
print("-"*10)
print(f"RFC Acc: {rfc_acc}")
print("-"*10)
print(f"GBT Acc: {gbt_acc}")
print("-"*10)


---

### Thinks . Best regards Cedarta DONOU 


---

