In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1").config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven").getOrCreate()
spark

df = spark.read.parquet("s3://502-project-1/data")

df.printSchema()
df = df.na.drop(subset=['profit'])

from pyspark.sql.functions import isnan, count, when, col, isnull
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()
df = df.na.fill(0)

df.createOrReplaceTempView("dfTable")
df = spark.sql("select *, CASE \
                   when profit >= 0 then 1 \
                   when profit < 0 then 0 \
                   end as label from dfTable")

cols = ['cik', 'company_name', 'assigned_sic', 'accession_number_int', 'filing_date', 
        'CostOfGoodsAndServicesSold', 'CostOfGoodsSold', 'CostOfServices', 'EarningsPerShareBasic', 
        'EarningsPerShareDiluted', 'GainLossOnDispositionOfAssets', 'GeneralAndAdministrativeExpense', 
        'IncomeTaxesPaid', 'IncreaseDecreaseInAccountsPayable', 'IncreaseDecreaseInAccountsReceivable', 
        'IncreaseDecreaseInAccruedLiabilities', 'IncreaseDecreaseInInventories', 'LaborAndRelatedExpense', 
        'NetCashProvidedByUsedInFinancingActivities', 
        'NetCashProvidedByUsedInFinancingActivitiesContinuingOperations', 
        'NetCashProvidedByUsedInInvestingActivities', 
        'NetCashProvidedByUsedInInvestingActivitiesContinuingOperations', 
        'NetCashProvidedByUsedInOperatingActivities', 
        'NetCashProvidedByUsedInOperatingActivitiesContinuingOperations', 'NetIncomeLoss', 'OperatingExpenses', 
        'OperatingIncomeLoss', 'PaymentsForRepurchaseOfCommonStock', 'PaymentsOfDividends', 
        'PaymentsOfDividendsCommonStock', 'PaymentsOfFinancingCosts', 
        'PaymentsToAcquireBusinessesNetOfCashAcquired', 'PaymentsToAcquirePropertyPlantAndEquipment', 
        'RepaymentsOfLongTermDebt', 'ResearchAndDevelopmentExpense', 'Revenues', 'SellingAndMarketingExpense', 
        'ShareBasedCompensation', 'year', 'profit_year', 'profit', 'label']

from pyspark.ml.feature import VectorAssembler

numericCols = ['CostOfGoodsAndServicesSold', 'CostOfGoodsSold', 'CostOfServices', 'EarningsPerShareBasic', 
               'EarningsPerShareDiluted', 'GainLossOnDispositionOfAssets', 'GeneralAndAdministrativeExpense', 
               'IncomeTaxesPaid', 'IncreaseDecreaseInAccountsPayable', 'IncreaseDecreaseInAccountsReceivable', 
               'IncreaseDecreaseInAccruedLiabilities', 'IncreaseDecreaseInInventories', 
               'LaborAndRelatedExpense', 'NetCashProvidedByUsedInFinancingActivities', 
               'NetCashProvidedByUsedInFinancingActivitiesContinuingOperations', 
               'NetCashProvidedByUsedInInvestingActivities', 'NetCashProvidedByUsedInInvestingActivitiesContinuingOperations', 
               'NetCashProvidedByUsedInOperatingActivities', 'NetCashProvidedByUsedInOperatingActivitiesContinuingOperations', 
               'NetIncomeLoss', 'OperatingExpenses', 'OperatingIncomeLoss', 'PaymentsForRepurchaseOfCommonStock', 
               'PaymentsOfDividends', 'PaymentsOfDividendsCommonStock', 'PaymentsOfFinancingCosts', 'PaymentsToAcquireBusinessesNetOfCashAcquired', 
               'PaymentsToAcquirePropertyPlantAndEquipment', 'RepaymentsOfLongTermDebt', 'ResearchAndDevelopmentExpense', 
               'Revenues', 'SellingAndMarketingExpense', 'ShareBasedCompensation']

assembler = VectorAssembler(inputCols=numericCols, outputCol="features", handleInvalid="keep")

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler])

pipelineModel = pipeline.fit(df)

df = pipelineModel.transform(df)
selectedCols = ['features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- cik: integer (nullable = true)
 |-- company_name: string (nullable = true)
 |-- assigned_sic: integer (nullable = true)
 |-- accession_number_int: long (nullable = true)
 |-- filing_date: date (nullable = true)
 |-- CostOfGoodsAndServicesSold: double (nullable = true)
 |-- CostOfGoodsSold: double (nullable = true)
 |-- CostOfServices: double (nullable = true)
 |-- EarningsPerShareBasic: double (nullable = true)
 |-- EarningsPerShareDiluted: double (nullable = true)
 |-- GainLossOnDispositionOfAssets: double (nullable = true)
 |-- GeneralAndAdministrativeExpense: double (nullable = true)
 |-- IncomeTaxesPaid: double (nullable = true)
 |-- IncreaseDecreaseInAccountsPayable: double (nullable = true)
 |-- IncreaseDecreaseInAccountsReceivable: double (nullable = true)
 |-- IncreaseDecreaseInAccruedLiabilities: double (nullable = true)
 |-- IncreaseDecreaseInInventories: double (nullable = true)
 |-- LaborAndRelatedExpense: double (nullable = true)
 |-- NetCashProvidedByUsedInFinan

In [2]:
import mmlspark

from mmlspark.lightgbm import LightGBMRegressor

train, test = df.randomSplit([0.85, 0.15], seed=1)

lgb = LightGBMRegressor(alpha=0.3, learningRate=0.3, numIterations=100, numLeaves=31, featuresCol='features',labelCol='profit')

lgbModel = lgb.fit(train)

test_model = lgbModel.transform(test)

In [41]:
test_true = test_model.select(['profit']).collect()
test_predict = test_model.select(['prediction']).collect()
test_company = test_model.select(['company_name']).collect()

In [43]:
test_true_array = [int(row.profit) for row in test_true]
test_predict_array = [int(row.prediction) for row in test_predict]
test_company_array = [str(row.company_name) for row in test_company]

In [44]:
test_company_array[:100]

['ROSETTA STONE INC',
 'WILLAMETTE VALLEY VINEYARDS INC',
 'INGRAM MICRO INC',
 'TRINITY INDUSTRIES INC',
 'DANAHER CORP /DE/',
 'FLOWSERVE CORP',
 'CENTURYLINK, INC',
 'BAXTER INTERNATIONAL INC',
 'AMAG PHARMACEUTICALS, INC.',
 'SYNERON MEDICAL LTD.',
 'XERIUM TECHNOLOGIES INC',
 'LIBBEY INC',
 'GRAPHIC PACKAGING HOLDING CO',
 'CEPHEID',
 'WEYERHAEUSER CO',
 'STANDARD MOTOR PRODUCTS, INC.',
 'STANDARD MOTOR PRODUCTS, INC.',
 'EMAGIN CORP',
 'PCM, INC.',
 'FORTUNE BRANDS HOME & SECURITY, INC.',
 'ACCO BRANDS CORP',
 'SIGMA ALDRICH CORP',
 'AMGEN INC',
 'LYONDELLBASELL INDUSTRIES N.V.',
 'UNITED RENTALS, INC.',
 'TELKONET INC',
 'GORMAN RUPP CO',
 'PULSE EVOLUTION GROUP, INC.',
 'ENVIRO TECHNOLOGIES, INC.',
 'MGP INGREDIENTS INC',
 'GEVO, INC.',
 'WESTERN UNION CO',
 'NORWEGIAN CRUISE LINE HOLDINGS LTD.',
 'AMEDISYS INC',
 'LEJU HOLDINGS LTD',
 'WEST CORP',
 'PZENA INVESTMENT MANAGEMENT, INC.',
 'NXP SEMICONDUCTORS N.V.',
 'EQUINIX INC',
 'UNITED INSURANCE HOLDINGS CORP.',
 'DIAMONDROCK

In [37]:
test_true_array[:100]

[-78850000,
 3636044,
 462352000,
 742200000,
 3431300000,
 277455000,
 2331000000,
 724000000,
 -293261000,
 -6119000,
 46140000,
 72499000,
 342700000,
 3815000,
 870000000,
 71431000,
 81268000,
 -8733000,
 11441000,
 357100000,
 163500000,
 647000000,
 10263000000,
 5460000000,
 591000000,
 369675,
 53305000,
 -1144000,
 -720392,
 41975000,
 -23282000,
 1330000000,
 502941000,
 -112205000,
 43612659,
 480214000,
 78755000,
 2710000000,
 460932000,
 64333000,
 139783000,
 1319000000,
 -3474000,
 126875000,
 90800000,
 479609000,
 22153000,
 76538000,
 625138000,
 324540000,
 -122436000,
 -30751000,
 794300000,
 1285676000,
 746000000,
 321100000,
 -387146,
 3601000000,
 -100540000,
 1646000000,
 38963000,
 -886465000,
 143820000,
 149583000,
 -164000,
 90414000,
 -579000000,
 572562000,
 -38438314,
 -4182658,
 -6076577,
 -3419241,
 -38235000,
 480000,
 1262000,
 -12171315,
 -16663119,
 -93230000,
 -1160406,
 -65809000,
 -4853338,
 -37409000,
 -35780000,
 14151000,
 1439000,
 -12567,

In [40]:
test_predict_array[:100]

[-24869762,
 23257,
 135167727,
 975576221,
 2749263786,
 1123230429,
 5464198724,
 324794239,
 69338172,
 -16351012,
 23966550,
 61866084,
 348295078,
 -7868147,
 1266447674,
 72464830,
 76571559,
 -4741325,
 38182200,
 149905641,
 157377729,
 490059639,
 -10647926594,
 4377598703,
 336422699,
 23257,
 27093372,
 23257,
 23257,
 27093372,
 -21742941,
 1380363554,
 13495664,
 -119506899,
 61866084,
 468467752,
 71297929,
 1463274429,
 316217033,
 27093372,
 184853508,
 1440052445,
 -4741325,
 147780951,
 66211350,
 819358344,
 27583290,
 2423202,
 633036659,
 183703668,
 58739262,
 -21742941,
 200386391,
 1921819973,
 1116007809,
 302171762,
 23257,
 5688949667,
 23257,
 616952167,
 36525217,
 2703855810,
 126971276,
 333314887,
 23257,
 104219529,
 -462366670,
 685383345,
 -21742941,
 -16351012,
 -4741325,
 -4741325,
 -21742941,
 23257,
 23257,
 -4741325,
 -21742941,
 -33270483,
 23257,
 -36397304,
 -4741325,
 -64239894,
 -21742941,
 23257,
 23257,
 23257,
 -34423043,
 49185981,
 2396

In [3]:
print(lgbModel.getFeatureImportances())

from mmlspark.train import ComputeModelStatistics
metrics = ComputeModelStatistics(evaluationMetric='regression',
                                 labelCol='profit',
                                 scoresCol='prediction') \
            .transform(test_model)
metrics.toPandas()

[5.0, 28.0, 5.0, 246.0, 153.0, 4.0, 35.0, 104.0, 65.0, 73.0, 8.0, 128.0, 6.0, 143.0, 25.0, 219.0, 62.0, 208.0, 70.0, 177.0, 100.0, 184.0, 132.0, 35.0, 110.0, 4.0, 91.0, 150.0, 71.0, 17.0, 212.0, 4.0, 126.0]


Unnamed: 0,mean_squared_error,root_mean_squared_error,R^2,mean_absolute_error
0,6.698896e+18,2588223000.0,0.887675,339267500.0


In [13]:
feature_importance = [5.0, 28.0, 5.0, 246.0, 153.0, 4.0, 35.0, 104.0, 65.0, 73.0, 8.0, 128.0, 6.0, 143.0, 25.0, 219.0, 62.0, 208.0, 70.0, 177.0, 100.0, 184.0, 132.0, 35.0, 110.0, 4.0, 91.0, 150.0, 71.0, 17.0, 212.0, 4.0, 126.0]

In [14]:
sorted_feature = [x for _,x in sorted(zip(feature_importance, numericCols), reverse=True)]

In [17]:
selected_features = sorted_feature[:15]

In [18]:
selected_features

['EarningsPerShareBasic',
 'NetCashProvidedByUsedInInvestingActivities',
 'Revenues',
 'NetCashProvidedByUsedInOperatingActivities',
 'OperatingIncomeLoss',
 'NetIncomeLoss',
 'EarningsPerShareDiluted',
 'PaymentsToAcquirePropertyPlantAndEquipment',
 'NetCashProvidedByUsedInFinancingActivities',
 'PaymentsForRepurchaseOfCommonStock',
 'IncreaseDecreaseInInventories',
 'ShareBasedCompensation',
 'PaymentsOfDividendsCommonStock',
 'IncomeTaxesPaid',
 'OperatingExpenses']

In [16]:
sorted(feature_importance,reverse=True)

[246.0,
 219.0,
 212.0,
 208.0,
 184.0,
 177.0,
 153.0,
 150.0,
 143.0,
 132.0,
 128.0,
 126.0,
 110.0,
 104.0,
 100.0,
 91.0,
 73.0,
 71.0,
 70.0,
 65.0,
 62.0,
 35.0,
 35.0,
 28.0,
 25.0,
 17.0,
 8.0,
 6.0,
 5.0,
 5.0,
 4.0,
 4.0,
 4.0]

In [4]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator as bce

In [5]:
evaluator = bce(rawPredictionCol='prediction', labelCol='profit')
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

0.852963495439871
0.890500542759393


In [6]:
from mmlspark.lightgbm import LightGBMClassifier
lgbClassifier = LightGBMClassifier(learningRate=0.3,
                           numIterations=100,
                           numLeaves=31,
                           featuresCol = 'features',
                            labelCol = 'label')

In [7]:
lgbClassifierModel = lgbClassifier.fit(train)

In [8]:
test_class = lgbClassifierModel.transform(test)

In [9]:
evaluator = bce(rawPredictionCol='probability', labelCol='profit')
print(evaluator.evaluate(test_class, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_class, {evaluator.metricName: 'areaUnderPR'}))

0.9409936280553806
0.9405454155891646


In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
eval = BinaryClassificationEvaluator()
print('Test Area Under ROC', eval.evaluate(test_class))

Test Area Under ROC 0.9409936280553806


In [12]:
y_true = test_class.select(['label']).collect()
y_pred = test_class.select(['prediction']).collect()
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))
confusion_matrix(y_true, y_pred, labels=[0, 1])

              precision    recall  f1-score   support

           0       0.87      0.87      0.87      1174
           1       0.89      0.89      0.89      1448

    accuracy                           0.88      2622
   macro avg       0.88      0.88      0.88      2622
weighted avg       0.88      0.88      0.88      2622



array([[1021,  153],
       [ 156, 1292]])