In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
from pyspark.sql import Row,SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
spark = SparkSession.builder.appName("MyApp2") \
            .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.10.0") \
            .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
            .getOrCreate()

In [5]:
df = spark.read.format('csv').load('all_cars_ml.csv', header=True)

In [67]:
df.printSchema()

root
 |-- amount: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- generation: string (nullable = true)
 |-- year: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- transmission_type: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- drive_type: string (nullable = true)
 |-- color: string (nullable = true)
 |-- mileage_km: string (nullable = true)
 |-- condition: string (nullable = true)

+-------+-----+-----+---------------+----+---------------+--------------------+-----------------+-----------------+--------------------+-----------+----------+----------+
| amount|brand|model|     generation|year|engine_capacity|         engine_type|transmission_type|        body_type|          drive_type|      color|mileage_km| condition|
+-------+-----+-----+---------------+----+---------------+--------------------+-----------------+-----------------+-----

In [7]:
df.show()

+-------+-----+-----+---------------+----+---------------+--------------------+-----------------+-----------------+--------------------+-----------+----------+----------+
| amount|brand|model|     generation|year|engine_capacity|         engine_type|transmission_type|        body_type|          drive_type|      color|mileage_km| condition|
+-------+-----+-----+---------------+----+---------------+--------------------+-----------------+-----------------+--------------------+-----------+----------+----------+
| 4400.0|Acura|   TL|II · Рестайлинг|2002|            3.2|              бензин|          автомат|            седан|     передний привод|     чёрный|    313821|с пробегом|
| 7300.0|Acura|   TL|            III|2004|            3.2|              бензин|          автомат|            седан|     передний привод|      белый|    230000|с пробегом|
|36900.0|Acura|  RDX|            III|2019|            2.0|              бензин|          автомат|внедорожник 5 дв.|     передний привод|     чёрн

In [8]:
stringIndexerBrand = StringIndexer(inputCol='brand', outputCol = "brandIndex").setHandleInvalid("skip")
encoder_brand = OneHotEncoder(inputCol="brandIndex", outputCol="brandVec")

stringIndexerModel = StringIndexer(inputCol='model', outputCol = "modelIndex").setHandleInvalid("skip")
encoder_model = OneHotEncoder(inputCol="modelIndex", outputCol="modelVec")

stringIndexerGeneration = StringIndexer(inputCol='generation', outputCol = "generationIndex").setHandleInvalid("skip")
encoder_generation = OneHotEncoder(inputCol="generationIndex", outputCol="generationVec")

stringIndexerEngine_type = StringIndexer(inputCol='engine_type', outputCol = "engine_typeIndex").setHandleInvalid("skip")
encoder_engine_type = OneHotEncoder(inputCol="engine_typeIndex", outputCol="engine_typeVec")

stringIndexerTransmission_type = StringIndexer(inputCol='transmission_type', outputCol = "transmission_typeIndex").setHandleInvalid("skip")
encoder_transmission_type = OneHotEncoder(inputCol="transmission_typeIndex", outputCol="transmission_typeVec")

stringIndexerBody_type = StringIndexer(inputCol='body_type', outputCol = "body_typeIndex").setHandleInvalid("skip")
encoder_body_type = OneHotEncoder(inputCol="body_typeIndex", outputCol="body_typeVec")

stringIndexerDrive_type = StringIndexer(inputCol='drive_type', outputCol = "drive_typeIndex").setHandleInvalid("skip")
encoder_drive_type = OneHotEncoder(inputCol="drive_typeIndex", outputCol="drive_typeVec")

stringIndexerColor = StringIndexer(inputCol='color', outputCol = "colorIndex").setHandleInvalid("skip")
encoder_color = OneHotEncoder(inputCol="colorIndex", outputCol="colorVec")

stringIndexerCondition = StringIndexer(inputCol='condition', outputCol = "conditionIndex").setHandleInvalid("skip")
encoder_condition = OneHotEncoder(inputCol="conditionIndex", outputCol="conditionVec")

stringIndexerYear = StringIndexer(inputCol='year', outputCol = "yearIndex").setHandleInvalid("skip")
stringIndexerEngine_capacity = StringIndexer(inputCol='engine_capacity', outputCol = "engine_capacityIndex").setHandleInvalid("skip")
stringIndexerMileage_km = StringIndexer(inputCol='mileage_km', outputCol = "mileage_kmIndex").setHandleInvalid("skip")



assembler = VectorAssembler(
    inputCols=["brandVec", "modelVec", 'generationVec', 'engine_typeVec',
              'transmission_typeVec', 'body_typeVec', 'drive_typeVec', 'colorVec',
              'conditionVec', 'yearIndex', 'engine_capacityIndex', 'mileage_kmIndex'], outputCol="features")


preproc = Pipeline(stages=[
    stringIndexerBrand,
    encoder_brand,
    stringIndexerModel,
    encoder_model,
    stringIndexerGeneration,
    encoder_generation,
    stringIndexerEngine_type,
    encoder_engine_type,
    stringIndexerTransmission_type,
    encoder_transmission_type,
    stringIndexerBody_type,
    encoder_body_type,
    stringIndexerDrive_type,
    encoder_drive_type,
    stringIndexerColor,
    encoder_color,
    stringIndexerCondition,
    encoder_condition,
    stringIndexerYear,
    stringIndexerEngine_capacity,
    stringIndexerMileage_km,
    assembler
])

In [9]:
preproc = preproc.fit(df)

train, test = df.randomSplit([0.85, 0.15], seed=1)
train_proc = preproc.transform(train).cache()
test_proc = preproc.transform(test).cache()

In [10]:
srcDataSchema = [
    StructField("features", VectorUDT()),
    StructField("label", DoubleType())
]

In [11]:
trainData = train_proc.rdd.map(lambda x: Row(x.features, float(x.amount)))
trainDf = spark.createDataFrame(trainData, StructType(srcDataSchema))

evalData = test_proc.rdd.map(lambda x: Row(x.features, float(x.amount)))
evalDf = spark.createDataFrame(evalData, StructType(srcDataSchema))

In [12]:
from synapse.ml.lightgbm import LightGBMRegressor

In [100]:
model = LightGBMRegressor(
    objective="poisson", learningRate=0.1, numLeaves=32, numIterations=1000
).fit(trainDf)

In [101]:
print(model.getFeatureImportances())

[198.0, 228.0, 172.0, 205.0, 62.0, 98.0, 116.0, 47.0, 102.0, 248.0, 48.0, 68.0, 148.0, 117.0, 82.0, 26.0, 115.0, 64.0, 78.0, 45.0, 123.0, 56.0, 109.0, 71.0, 10.0, 91.0, 32.0, 42.0, 31.0, 30.0, 23.0, 128.0, 40.0, 0.0, 86.0, 34.0, 0.0, 40.0, 32.0, 41.0, 69.0, 0.0, 0.0, 22.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 81.0, 19.0, 60.0, 24.0, 79.0, 27.0, 11.0, 40.0, 37.0, 37.0, 20.0, 17.0, 59.0, 36.0, 63.0, 26.0, 33.0, 17.0, 124.0, 16.0, 2.0, 31.0, 26.0, 5.0, 33.0, 21.0, 9.0, 21.0, 24.0, 47.0, 38.0, 84.0, 21.0, 10.0, 71.0, 31.0, 56.0, 44.0, 22.0, 52.0, 10.0, 40.0, 42.0, 79.0, 51.0, 5.0, 13.0, 10.0, 37.0, 11.0, 48.0, 48.0, 60.0, 28.0, 11.0, 47.0, 8.0, 35.0, 7.0, 2.0, 46.0, 7.0, 33.0, 14.0, 29.0, 32.0, 1.0, 4.0, 30.0, 25.0, 34.0, 28.0, 56.0, 16.0, 8.0, 1.0, 9.0, 23.0, 12.0, 9.0, 47.0, 1.0, 33.0, 30.0, 2.0, 0.0, 68.0, 0.0, 0.0, 7.0, 0.0, 29.0, 20.0, 4.0, 20.0, 0.0, 30.0, 58.0, 20.0, 17.0, 9.0, 5.0, 37.0, 17.0, 0.0, 26.0, 12.0, 28.0, 0.0, 0.0, 7.0, 28.0, 13.0, 47.0, 0.0, 6.0, 17.0, 48.0

In [102]:
scoredData = model.transform(evalDf)
scoredData.show(100)

+--------------------+--------+------------------+
|            features|   label|        prediction|
+--------------------+--------+------------------+
|(1571,[28,162,850...|  1000.0| 1222.014426756741|
|(1571,[1,64,808,1...|  1000.0| 1120.187126553311|
|(1571,[1,64,808,1...|  1000.0|1311.7783961669697|
|(1571,[1,64,808,1...|  1000.0|  1122.70701232884|
|(1571,[1,77,787,1...|  1000.0|1336.4539817987784|
|(1571,[1,77,787,1...|  1000.0| 1350.184216723548|
|(1571,[1,77,787,1...|  1000.0|1324.0458457776278|
|(1571,[0,54,811,1...|  1000.0|2484.7643934137454|
|(1571,[10,78,818,...|  1000.0| 807.8086811499552|
|(1571,[10,78,807,...|  1000.0|1530.4681474280533|
|(1571,[10,111,779...|  1000.0|1232.8222284319716|
|(1571,[27,272,779...|  1000.0|2304.2252793511598|
|(1571,[27,244,928...|  1000.0| 1320.308054616156|
|(1571,[23,253,778...|  1000.0|2541.8354600981334|
|(1571,[6,61,780,1...|  1000.0| 1695.189721835564|
|(1571,[6,61,780,1...|  1000.0| 1692.476938718752|
|(1571,[6,61,780,1...|  1000.0|

In [103]:
from synapse.ml.train import ComputeModelStatistics

metrics = ComputeModelStatistics(
    evaluationMetric="regression", labelCol="label", scoresCol="prediction"
).transform(scoredData)
print(metrics)

DataFrame[mean_squared_error: double, root_mean_squared_error: double, R^2: double, mean_absolute_error: double]


In [104]:
metrics.show()

+-------------------+-----------------------+-----------------+-------------------+
| mean_squared_error|root_mean_squared_error|              R^2|mean_absolute_error|
+-------------------+-----------------------+-----------------+-------------------+
|3.202183440979129E7|      5658.783827801809|0.896040547067853| 2001.0208085686224|
+-------------------+-----------------------+-----------------+-------------------+

