In [0]:
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator




In [0]:
%fs 
ls /mnt/seminario

path,name,size,modificationTime
dbfs:/mnt/seminario/CustomerAddress,CustomerAddress,4285,1685863239000
dbfs:/mnt/seminario/DimAddress,DimAddress,28557,1685863238000
dbfs:/mnt/seminario/DimCustomer,DimCustomer,70895,1685863237000
dbfs:/mnt/seminario/DimOrder,DimOrder,67405,1685863235000
dbfs:/mnt/seminario/DimProduct,DimProduct,30274,1685863233000
dbfs:/mnt/seminario/FactSales,FactSales,5997,1685863887000


In [0]:
df = pd.read_csv("/dbfs/mnt/seminario/DimOrder")

In [0]:
df.head(5)

Unnamed: 0,SalesOrderID,SalesOrderDetailID,OrderQty,ProductID,UnitPrice,UnitPriceDiscount,LineTotal,Status,AccountNumber,CustomerID,ShipMethod,SubTotal,TaxAmt,Freight,TotalDue,orderID
0,71774,110562,1,836,356.898,0.0,356.898,5,10-4020-000609,29847,CARGO TRANSPORT 5,880.3484,70.4279,22.0087,972.785,1
1,71774,110563,1,822,356.898,0.0,356.898,5,10-4020-000609,29847,CARGO TRANSPORT 5,880.3484,70.4279,22.0087,972.785,2
2,71776,110567,1,907,63.9,0.0,63.9,5,10-4020-000106,30072,CARGO TRANSPORT 5,78.81,6.3048,1.9703,87.0851,3
3,71780,110616,4,905,218.454,0.0,873.816,5,10-4020-000340,30113,CARGO TRANSPORT 5,38418.6895,3073.4952,960.4672,42452.6519,4
4,71780,110617,2,983,461.694,0.0,923.388,5,10-4020-000340,30113,CARGO TRANSPORT 5,38418.6895,3073.4952,960.4672,42452.6519,5


In [0]:
dfml = df[["OrderQty","UnitPrice","LineTotal","SubTotal","TaxAmt","TotalDue","ProductID"]]

In [0]:
dfml.head(5)

Unnamed: 0,OrderQty,UnitPrice,LineTotal,SubTotal,TaxAmt,TotalDue,ProductID
0,1,356.898,356.898,880.3484,70.4279,972.785,836
1,1,356.898,356.898,880.3484,70.4279,972.785,822
2,1,63.9,63.9,78.81,6.3048,87.0851,907
3,4,218.454,873.816,38418.6895,3073.4952,42452.6519,905
4,2,461.694,923.388,38418.6895,3073.4952,42452.6519,983


In [0]:
dfml["Target"] = [1 if s>=2  else 0 for s in dfml["OrderQty"]]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dfml["Target"] = [1 if s>=2  else 0 for s in dfml["OrderQty"]]


In [0]:
dfml.head(5)

Unnamed: 0,OrderQty,UnitPrice,LineTotal,SubTotal,SubTotal.1,TaxAmt,TotalDue,ProductID,Target
0,1,356.898,356.898,880.3484,880.3484,70.4279,972.785,836,0
1,1,356.898,356.898,880.3484,880.3484,70.4279,972.785,822,0
2,1,63.9,63.9,78.81,78.81,6.3048,87.0851,907,0
3,4,218.454,873.816,38418.6895,38418.6895,3073.4952,42452.6519,905,1
4,2,461.694,923.388,38418.6895,38418.6895,3073.4952,42452.6519,983,1


In [0]:
df = spark.createDataFrame(dfml)

crt = ["OrderQty","UnitPrice","LineTotal","SubTotal","TaxAmt","TotalDue","ProductID"]

assembler = VectorAssembler(inputCols=crt,outputCol='features')

transform_data = assembler.transform(df)

transform_data.show(5)

+--------+---------+---------+----------+---------+----------+---------+------+--------------------+
|OrderQty|UnitPrice|LineTotal|  SubTotal|   TaxAmt|  TotalDue|ProductID|Target|            features|
+--------+---------+---------+----------+---------+----------+---------+------+--------------------+
|       1|  356.898|  356.898|  880.3484|  70.4279|   972.785|      836|     0|[1.0,356.898,356....|
|       1|  356.898|  356.898|  880.3484|  70.4279|   972.785|      822|     0|[1.0,356.898,356....|
|       1|     63.9|     63.9|     78.81|   6.3048|   87.0851|      907|     0|[1.0,63.9,63.9,78...|
|       4|  218.454|  873.816|38418.6895|3073.4952|42452.6519|      905|     1|[4.0,218.454,873....|
|       2|  461.694|  923.388|38418.6895|3073.4952|42452.6519|      983|     1|[2.0,461.694,923....|
+--------+---------+---------+----------+---------+----------+---------+------+--------------------+
only showing top 5 rows



In [0]:
#entrenamiento del modelo
(training_data, test_data) = transform_data.randomSplit([0.8,0.2])

In [0]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Target',
                          featuresCol='features',
                          maxDepth=5)

model = rf.fit(training_data)

In [0]:
predictions = model.transform(test_data)
predictions.show(10)

+--------+---------+---------+-----------+---------+----------+---------+------+--------------------+--------------------+--------------------+----------+
|OrderQty|UnitPrice|LineTotal|   SubTotal|   TaxAmt|  TotalDue|ProductID|Target|            features|       rawPrediction|         probability|prediction|
+--------+---------+---------+-----------+---------+----------+---------+------+--------------------+--------------------+--------------------+----------+
|       1|     63.9|     63.9|      78.81|   6.3048|   87.0851|      907|     0|[1.0,63.9,63.9,78...|[18.8333333333333...|[0.94166666666666...|       0.0|
|       1|  112.998|  67.7988| 38418.6895|3073.4952|42452.6519|      985|     0|[1.0,112.998,67.7...|[19.3333333333333...|[0.96666666666666...|       0.0|
|       1|  323.994|  323.994| 38418.6895|3073.4952|42452.6519|      990|     0|[1.0,323.994,323....|          [20.0,0.0]|           [1.0,0.0]|       0.0|
|       1|   445.41|   445.41|108561.8317|8684.9465|119960.824|      9

In [0]:
predictionspandas = predictions.select("*").toPandas()
predictionspandas

  Unable to convert the field features. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT()
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Unnamed: 0,OrderQty,UnitPrice,LineTotal,SubTotal,TaxAmt,TotalDue,ProductID,Target,features,rawPrediction,probability,prediction
0,1,63.900,63.9000,78.8100,6.3048,87.0851,907,0,"[1.0, 63.9, 63.9, 78.81, 6.3048, 87.0851, 907.0]","[18.833333333333336, 1.1666666666666667]","[0.9416666666666667, 0.05833333333333333]",0.0
1,1,112.998,67.7988,38418.6895,3073.4952,42452.6519,985,0,"[1.0, 112.998, 67.7988, 38418.6895, 3073.4952,...","[19.333333333333336, 0.6666666666666666]","[0.9666666666666666, 0.033333333333333326]",0.0
2,1,323.994,323.9940,38418.6895,3073.4952,42452.6519,990,0,"[1.0, 323.994, 323.994, 38418.6895, 3073.4952,...","[20.0, 0.0]","[1.0, 0.0]",0.0
3,1,445.410,445.4100,108561.8317,8684.9465,119960.8240,959,0,"[1.0, 445.41, 445.41, 108561.8317, 8684.9465, ...","[20.0, 0.0]","[1.0, 0.0]",0.0
4,1,809.760,809.7600,38418.6895,3073.4952,42452.6519,743,0,"[1.0, 809.76, 809.76, 38418.6895, 3073.4952, 4...","[19.0, 1.0]","[0.95, 0.05]",0.0
...,...,...,...,...,...,...,...,...,...,...,...,...
110,5,818.700,4093.5000,98278.6910,7862.2953,108597.9536,748,1,"[5.0, 818.7, 4093.5, 98278.691, 7862.2953, 108...","[0.0, 20.0]","[0.0, 1.0]",1.0
111,5,1376.994,6884.9700,98278.6910,7862.2953,108597.9536,784,1,"[5.0, 1376.994, 6884.97, 98278.691, 7862.2953,...","[0.0, 20.0]","[0.0, 1.0]",1.0
112,6,1466.010,8796.0600,88812.8625,7105.0290,98138.2131,795,1,"[6.0, 1466.01, 8796.06, 88812.8625, 7105.029, ...","[0.0, 20.0]","[0.0, 1.0]",1.0
113,7,818.700,5730.9000,98278.6910,7862.2953,108597.9536,742,1,"[7.0, 818.7, 5730.9, 98278.691, 7862.2953, 108...","[0.0, 20.0]","[0.0, 1.0]",1.0


In [0]:
evaluator = BinaryClassificationEvaluator()  
evaluator.setRawPredictionCol('prediction')
evaluator.setLabelCol('Target')

Out[31]: BinaryClassificationEvaluator_2a004dae6a06

In [0]:
exactitud = evaluator.evaluate(predictions)
exactitud

Out[32]: 1.0