In [1]:
import findspark
findspark.init('/home/sdmohant/spark-3.0.0-bin-hadoop2.7')

In [2]:
import pyspark
from pyspark import SparkContext,SparkConf

In [3]:
from pyspark.sql import SparkSession

In [4]:
#spark = SparkSession.builder.appName("MyApp").getOrCreate()
#spark = SparkSession.builder.appName("MyName").getOrCreate()
conf = SparkConf().set("spark.cores.max", "16") \
    .set("spark.driver.memory", "16g") \
    .set("spark.executor.memory", "16g") \
    .set("spark.executor.memory_overhead", "16g") \
    .set("spark.driver.maxResultsSize", "0") \
    .set("spark.sql.shuffle.partitions","300")

sc = SparkContext(appName="chemmodel", conf=conf)
spark = SparkSession(sc)



In [5]:
from pyspark.sql.functions import isnan, count, col

In [6]:
df1 = spark.read.options(header = True, inferSchema = True,delimiter=',').csv("ChemistryData.csv")

In [7]:
from pyspark.sql.functions import isnan, when, count, col

In [8]:
df1.select([count(when(col(c).isNull(),c)).alias(c) for c in df1.columns]).show()

+-------+------+-------+-----------------+--------------+---------------+-------------+----------------+----+--------+---------+---------+---------+------------+--------------+--------+--------+---------+--------+--------------+-------------+--------------+
|PaperID|FOS_ID|FOSRank|FOSNormalizedName|FOSDisplayName|FOSDisplayLevel|FOSPaperCount|FOSCitationCount|Rank| DocType|PaperYear|Publisher|JournalID|ConfSeriesID|ConfInstanceID|  Volume|   Issue|FirstPage|LastPage|ReferenceCount|CitationCount|EstimatedCount|
+-------+------+-------+-----------------+--------------+---------------+-------------+----------------+----+--------+---------+---------+---------+------------+--------------+--------+--------+---------+--------+--------------+-------------+--------------+
|      0|     0|      0|                0|             0|              0|            0|               0|   0|10931386|        0| 14591008| 12306008|    37067576|      37258483|13531480|16883559| 14440171|15950007|             

In [9]:
df1.select("Publisher").distinct().count()

152851

In [10]:
drop_cols = ["FOS_ID","FOSNormalizedName","FOSDisplayName","Publisher","FirstPage","LastPage"]

In [11]:
df2 = df1.drop(*drop_cols)

In [12]:
from pyspark.sql import functions as F

In [13]:
df2 = df2.withColumn("Volume", F.when(F.col("Volume").isNull(), 0).otherwise(F.col("Volume")))

In [14]:
df2 = df2.withColumn("Issue", F.when(F.col("Issue").isNull(), 0).otherwise(F.col("Issue")))

In [15]:
df2.select([count(when(col(c).isNull(),c)).alias(c) for c in df2.columns]).show()

+-------+-------+---------------+-------------+----------------+----+--------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+
|PaperID|FOSRank|FOSDisplayLevel|FOSPaperCount|FOSCitationCount|Rank| DocType|PaperYear|JournalID|ConfSeriesID|ConfInstanceID|Volume|Issue|ReferenceCount|CitationCount|EstimatedCount|
+-------+-------+---------------+-------------+----------------+----+--------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+
|      0|      0|              0|            0|               0|   0|10931386|        0| 12306008|    37067576|      37258483|     0|    0|             0|            0|             0|
+-------+-------+---------------+-------------+----------------+----+--------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+



In [16]:
df2 = df2.withColumn("DocType", F.when(F.col("DocType").isNull(), "None").otherwise(F.col("DocType")))

In [17]:
df2.select([count(when(col(c).isNull(),c)).alias(c) for c in df2.columns]).show()

+-------+-------+---------------+-------------+----------------+----+-------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+
|PaperID|FOSRank|FOSDisplayLevel|FOSPaperCount|FOSCitationCount|Rank|DocType|PaperYear|JournalID|ConfSeriesID|ConfInstanceID|Volume|Issue|ReferenceCount|CitationCount|EstimatedCount|
+-------+-------+---------------+-------------+----------------+----+-------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+
|      0|      0|              0|            0|               0|   0|      0|        0| 12306008|    37067576|      37258483|     0|    0|             0|            0|             0|
+-------+-------+---------------+-------------+----------------+----+-------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+



In [18]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="DocType", outputCol="DocTypeIndex")
indexed = indexer.fit(df2).transform(df2)
indexed.show(2)

+-------+-------+---------------+-------------+----------------+-----+-------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+------------+
|PaperID|FOSRank|FOSDisplayLevel|FOSPaperCount|FOSCitationCount| Rank|DocType|PaperYear|JournalID|ConfSeriesID|ConfInstanceID|Volume|Issue|ReferenceCount|CitationCount|EstimatedCount|DocTypeIndex|
+-------+-------+---------------+-------------+----------------+-----+-------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+------------+
| 142535|   8764|              2|       127513|         1735654|28587|   None|     1966|     null|        null|          null|     0|    0|             0|            0|             0|         1.0|
| 142535|  10018|              1|       667379|         5630071|28587|   None|     1966|     null|        null|          null|     0|    0|             0|            0|             0|         1.0|
+-------+------

In [19]:
df2 = df2.withColumn("IsJournal", F.when(F.col("DocType").isNotNull(), 0).otherwise(1))

In [20]:
df2 = df2.withColumn("PaperAge", 2020-df2["PaperYear"])

In [21]:
df2.show(2)

+-------+-------+---------------+-------------+----------------+-----+-------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+---------+--------+
|PaperID|FOSRank|FOSDisplayLevel|FOSPaperCount|FOSCitationCount| Rank|DocType|PaperYear|JournalID|ConfSeriesID|ConfInstanceID|Volume|Issue|ReferenceCount|CitationCount|EstimatedCount|IsJournal|PaperAge|
+-------+-------+---------------+-------------+----------------+-----+-------+---------+---------+------------+--------------+------+-----+--------------+-------------+--------------+---------+--------+
| 142535|   8764|              2|       127513|         1735654|28587|   None|     1966|     null|        null|          null|     0|    0|             0|            0|             0|        0|      54|
| 142535|  10018|              1|       667379|         5630071|28587|   None|     1966|     null|        null|          null|     0|    0|             0|            0|             0|     

In [22]:
drop_cols = ["PaperID","DocType","JournalID","ConfSeriesID","ConfInstanceID","EstimatedCount"]
df3= df2.drop(*drop_cols)

In [23]:
df3.select([count(when(col(c).isNull(),c)).alias(c) for c in df3.columns]).show()

+-------+---------------+-------------+----------------+----+---------+------+-----+--------------+-------------+---------+--------+
|FOSRank|FOSDisplayLevel|FOSPaperCount|FOSCitationCount|Rank|PaperYear|Volume|Issue|ReferenceCount|CitationCount|IsJournal|PaperAge|
+-------+---------------+-------------+----------------+----+---------+------+-----+--------------+-------------+---------+--------+
|      0|              0|            0|               0|   0|        0|     0|    0|             0|            0|        0|       0|
+-------+---------------+-------------+----------------+----+---------+------+-----+--------------+-------------+---------+--------+



In [24]:
df3 = df3.withColumnRenamed("CitationCount","label")

In [25]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
feature_list = []
for col in df3.columns:
    if col == 'label':
        continue
    else:
        feature_list.append(col)
# Concatenates all feature columns into a single feature vector in a new column "rawFeatures"
vectorAssembler = VectorAssembler(inputCols=feature_list, outputCol="features")
# Identifies categorical features and indexes them
#vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [26]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [27]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="label", featuresCol="features")
#rf = RandomForestRegressor(featuresCol="features")

In [28]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, rf])

In [29]:
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
    .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
    .build()

In [30]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=3)

In [31]:
#(train, test) = df3.randomSplit([0.7, 0.3])

In [32]:
#cvModel = crossval.fit(train)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1212, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
Py4JNetworkError: Error while receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1212, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error o

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:38841)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38841)
Traceback (most recent call last):
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38841)
Traceback (most recent call last):
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:3884

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38841)
Traceback (most recent call last):
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38841)
Traceback (most recent call last):
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:3884

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38841)
Traceback (most recent call last):
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38841)
Traceback (most recent call last):
  File "/home/sdmohant/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:3884

In [None]:
#predictions = cvModel.transform(test)

In [None]:
import matplotlib.pyplot as plt

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)

rfPred = cvModel.transform(df2)

rfResult = rfPred.toPandas()

plt.plot(rfResult.label, rfResult.prediction, 'bo')
plt.xlabel('CitationCount')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()

In [None]:
bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[1]

importances = bestModel.featureImportances

x_values = list(range(len(importances)))

plt.bar(x_values, importances, orientation = 'vertical')
plt.xticks(x_values, feature_list, rotation=80)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')

In [31]:
df3.columns

['FOSRank',
 'FOSDisplayLevel',
 'FOSPaperCount',
 'FOSCitationCount',
 'Rank',
 'PaperYear',
 'Volume',
 'Issue',
 'ReferenceCount',
 'label',
 'IsJournal',
 'PaperAge']

In [32]:
df3_reorder = df3.select("FOSRank","FOSDisplayLevel","FOSPaperCount","Rank","PaperAge","Volume","Issue","ReferenceCount","IsJournal","label")

In [33]:
df3_reorder.columns

['FOSRank',
 'FOSDisplayLevel',
 'FOSPaperCount',
 'Rank',
 'PaperAge',
 'Volume',
 'Issue',
 'ReferenceCount',
 'IsJournal',
 'label']

In [36]:
df3_reorder = df3_reorder.toPandas()

In [41]:
X = df3_reorder.iloc[:1000000, 0:9].values
y = df3_reorder.iloc[:1000000, 9].values

In [42]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

In [43]:
# Feature Scaling
from sklearn.preprocessing import StandardScaler

sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)

In [44]:
from sklearn.ensemble import RandomForestRegressor

regressor = RandomForestRegressor(n_estimators=20, random_state=0)
regressor.fit(X_train, y_train)
y_pred = regressor.predict(X_test)

In [46]:
from sklearn import metrics
print('Mean Absolute Error:', metrics.mean_absolute_error(y_test, y_pred))
print('Mean Squared Error:', metrics.mean_squared_error(y_test, y_pred))
print('Root Mean Squared Error:', np.sqrt(metrics.mean_squared_error(y_test, y_pred)))

('Mean Absolute Error:', 3.259182277508195)
('Mean Squared Error:', 252.01709513611897)
('Root Mean Squared Error:', 15.875046303432283)


In [47]:
X = df3_reorder.iloc[:10000000, 0:9].values
y = df3_reorder.iloc[:10000000, 9].values

In [48]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

In [49]:
# Feature Scaling
from sklearn.preprocessing import StandardScaler

sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)

In [50]:
from sklearn.ensemble import RandomForestRegressor

regressor = RandomForestRegressor(n_estimators=20, random_state=0)
regressor.fit(X_train, y_train)
y_pred = regressor.predict(X_test)

In [51]:
from sklearn import metrics
print('Mean Absolute Error on 10 million records:', metrics.mean_absolute_error(y_test, y_pred))
print('Mean Squared Error on 10 million records:', metrics.mean_squared_error(y_test, y_pred))
print('Root Mean Squared Error on 10 million records:', np.sqrt(metrics.mean_squared_error(y_test, y_pred)))

('Mean Absolute Error on 10 million records:', 3.294705692045973)
('Mean Squared Error on 10 million records:', 3952.817109111224)
('Root Mean Squared Error on 10 million records:', 62.87143317207923)


In [53]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1)

In [54]:
# Feature Scaling
from sklearn.preprocessing import StandardScaler

sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)

In [55]:
regressor = RandomForestRegressor(n_estimators=20, random_state=0)
regressor.fit(X_train, y_train)
y_pred = regressor.predict(X_test)

In [56]:
from sklearn import metrics
print('Mean Absolute Error on 10 million records-2:', metrics.mean_absolute_error(y_test, y_pred))
print('Mean Squared Error on 10 million records-2:', metrics.mean_squared_error(y_test, y_pred))
print('Root Mean Squared Error on 10 million records-2:', np.sqrt(metrics.mean_squared_error(y_test, y_pred)))

('Mean Absolute Error on 10 million records-2:', 3.2617075261633905)
('Mean Squared Error on 10 million records-2:', 2446.6784243957372)
('Root Mean Squared Error on 10 million records-2:', 49.463910322534524)


In [57]:
from matplotlib import pyplot
# get importance
importance = regressor.feature_importances_
# summarize feature importance
for i,v in enumerate(importance):
	print('Feature: %0d, Score: %.5f' % (i,v))
# plot feature importance
pyplot.bar([x for x in range(len(importance))], importance)
pyplot.show()

Feature: 0, Score: 0.05248
Feature: 1, Score: 0.00022
Feature: 2, Score: 0.00723
Feature: 3, Score: 0.51438
Feature: 4, Score: 0.37234
Feature: 5, Score: 0.02254
Feature: 6, Score: 0.01259
Feature: 7, Score: 0.01821
Feature: 8, Score: 0.00000


<Figure size 640x480 with 1 Axes>