In [32]:
#import pyspark
#sc = pyspark.SparkContext.getOrCreate()
#sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, IntegerType, StringType
import xlrd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
#%matplotlib inline

In [2]:
sc

In [28]:

def read_excel_xlrd(path):
    xl_workbook = xlrd.open_workbook(path)
    #sheet_names = xl_workbook.sheet_names()
    #print('Sheet Names', sheet_names)
    #xl_sheet = xl_workbook.sheet_by_name(sheet_names[0])
    xl_sheet = xl_workbook.sheet_by_index(0)
    #print ('Sheet name: %s' % xl_sheet.name)
    data = []
    for row in range(xl_sheet.nrows):
        line = []
        for col in range(xl_sheet.ncols):
            line.append(str(xl_sheet.cell(row, col).value))
        data.append(line)
    columnNames = data[0]
    dataValues = data[1:]
    dataPairs = []
    for row in range(len(dataValues)):
        line = {}
        for col in range(len(columnNames)):
            line[columnNames[col]] = dataValues[row][col]
        dataPairs.append(Row(**line))
    dataRDD = sc.parallelize(dataPairs)
    df = dataRDD.toDF()
    return df

def topics_get(df, columnName, delimiter):
    rows = df.groupBy(columnName).count().collect()
    results = []
    for row in rows:
        for item in row[columnName].split(delimiter):
            if item not in results:
                results.append(item)
    results.sort()
    return results

def exist_list(list_all, list_compared):
    results = []
    for item in list_all:
        if item in list_compared:
            results.append(1)
        else:
            results.append(0)
    return results

def df_column_add(df, input_colName, output_colName, function, returnType):
    function_udf = udf(lambda item: function(item), returnType)
    return df.withColumn(output_colName, function_udf(col(input_colName)))

def countEmptyAndNull(df):
    for columnName in df.columns:
        print(r"Column '%s' has %d '', %d 'noInfo', %d 'None', %d filled, and %d null rows." 
              % (columnName, df.filter(df[columnName]=='').count(), df.filter(df[columnName]=='noInfo').count(),
                 df.filter(df[columnName]=='None').count(), df.filter(df[columnName]!='').count(),
                 df.filter(df[columnName].isNull()).count()))


In [4]:
df = read_excel_xlrd("lib-statistics.xlsx")
print(df.filter(df['Ödünç Sayısı'].isNull()).count())
print(df.filter(df['Ödünç Sayısı']=='').count())
df = df.withColumn('Sınıflama', split(df['Sınıflama'], " ")[0])
df = df.withColumn('Ödünç Sayısı', df['Ödünç Sayısı'].cast('float'))
print(df.filter(df['Ödünç Sayısı'].isNull()).count())
df = df.fillna({'Sınıflama' : 'noInfo', 'Eser Adı' : 'noInfo', 'Yazar' : 'noInfo', 'Dil' : 'noInfo', 'Konu Başlıkları' : 'noInfo' , 'Ödünç Sayısı' : 0.0})
print(df.filter(df['Ödünç Sayısı'].isNull()).count())

0
598
598
0


In [5]:
df.select(sum(df['Ödünç Sayısı'])).show()

+-----------------+
|sum(Ödünç Sayısı)|
+-----------------+
|            339.0|
+-----------------+



In [6]:
df.printSchema()

root
 |-- Dil: string (nullable = false)
 |-- Eser Adı: string (nullable = false)
 |-- Konu Başlıkları: string (nullable = false)
 |-- Sınıflama: string (nullable = false)
 |-- Yazar: string (nullable = false)
 |-- Ödünç Sayısı: float (nullable = false)



In [7]:
df.show()

+---+--------------------+--------------------+---------+--------------------+------------+
|Dil|            Eser Adı|     Konu Başlıkları|Sınıflama|               Yazar|Ödünç Sayısı|
+---+--------------------+--------------------+---------+--------------------+------------+
|eng|Museum frictions ...|Museums--Social A...|       AM|                    |         0.0|
|tur|Tahayyül gücünü y...|Imagination (Phil...|        B|                    |         0.0|
|tur|Bir yol var : Min...|Ontology
Stress M...|        B|        Damcı, Taner|         0.0|
|tur|Psikoloji şerhi =...|Aristotle
Aristot...|        B|            İbn Rüşd|         2.0|
|eng|An examination of...|Özel, İsmet, 1944...|        B|      Kaya, Vefa Can|         0.0|
|eng|An examination of...|Özel, İsmet, 1944...|        B|      Kaya, Vefa Can|         0.0|
|tur|Ruhun uyanışı, ya...|Philosophy, Islam...|        B|İbn Ṭufeyl, Muḥam...|         0.0|
|tur|Çağımızın sorunla...|Philosophy, Moder...|        B|   Russell, Bertrand|  

In [29]:
df.filter(df['Konu Başlıkları']=='').show()
df.filter(df['Konu Başlıkları']=='noInfo').show()
df.filter(df['Konu Başlıkları']=='None').show()
df.filter(df['Konu Başlıkları'].isNull()).show()
df.filter(df['Ödünç Sayısı'] != 0.0).count()

+---+--------------------+---------------+---------+--------------------+------------+
|Dil|            Eser Adı|Konu Başlıkları|Sınıflama|               Yazar|Ödünç Sayısı|
+---+--------------------+---------------+---------+--------------------+------------+
|tur|Edebiyat tarihi y...|               |       PL|Erünsal, İsmail E.|         1.0|
+---+--------------------+---------------+---------+--------------------+------------+

+---+--------+---------------+---------+-----+------------+
|Dil|Eser Adı|Konu Başlıkları|Sınıflama|Yazar|Ödünç Sayısı|
+---+--------+---------------+---------+-----+------------+
+---+--------+---------------+---------+-----+------------+

+---+--------+---------------+---------+-----+------------+
|Dil|Eser Adı|Konu Başlıkları|Sınıflama|Yazar|Ödünç Sayısı|
+---+--------+---------------+---------+-----+------------+
+---+--------+---------------+---------+-----+------------+

+---+--------+---------------+---------+-----+------------+
|Dil|Eser Adı|Konu Baş

153

In [30]:
countEmptyAndNull(df)

Column 'Dil' has 0 '', 0 'noInfo', 0 'None', 751 filled, and 0 null rows.
Column 'Eser Adı' has 0 '', 0 'noInfo', 0 'None', 751 filled, and 0 null rows.
Column 'Konu Başlıkları' has 1 '', 0 'noInfo', 0 'None', 750 filled, and 0 null rows.
Column 'Sınıflama' has 0 '', 0 'noInfo', 0 'None', 751 filled, and 0 null rows.
Column 'Yazar' has 294 '', 0 'noInfo', 0 'None', 457 filled, and 0 null rows.
Column 'Ödünç Sayısı' has 0 '', 0 'noInfo', 0 'None', 0 filled, and 0 null rows.


In [14]:
df.count()

751

In [15]:
#topics = topics_get(df, 'Konu Başlıkları', '\n')
#print(topics)

In [31]:
def function_topics(item):
    list1 = item.replace(' ', '_').split('\n')
    list1.sort()
    return ' '.join(list1)

def function_labels(item):
    pass

df_1 = df_column_add(df, 'Konu Başlıkları', 'topic titles', function_topics, StringType())


In [36]:
tokenizer = Tokenizer(inputCol='topic titles', outputCol='words')
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='features')
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(df_1)

test = sqlContext.createDataFrame([
    Row(text="You will get a prize. To claim call 09061701461. Claim code KL341. Valid 12 hours only."),
    Row(text="Even my brother is not like to speak with me. They treat me like aids patent.")])

#prediction = model.transform(test)

IllegalArgumentException: 'requirement failed: Column Sınıflama must be of type NumericType but was actually of type StringType.'

In [23]:
df2 = df.groupBy('Dil').count()
df2.select(sum(df2['count'])).show()

df.select(sum(df['Ödünç Sayısı'])).show()

print(df.filter(df['Ödünç Sayısı'].isNull()).count())

df.groupBy(df['Sınıflama']).count().show()

df_s = df.groupBy(df['Sınıflama']).count()
print(df_s.filter(df_s['count'] > 1).count())
print(df_s.filter(df_s['count'] < 1).count())
print(df_s.filter(df_s['count'] == 1).count())

+----------+
|sum(count)|
+----------+
|       751|
+----------+

+-----------------+
|sum(Ödünç Sayısı)|
+-----------------+
|            339.0|
+-----------------+

0
+---------+-----+
|Sınıflama|count|
+---------+-----+
|        K|    3|
|       UA|    1|
|       LA|    4|
|       BS|    4|
|       NK|    2|
|       AM|    1|
|       PL|   90|
|       PS|    7|
|       DR|   83|
|        F|    1|
|       JF|    1|
|       BP|  153|
|       NX|    1|
|        Q|    7|
|       JC|    6|
|       NA|    6|
|       HV|    9|
|       BX|    1|
|       VA|    1|
|        E|    1|
+---------+-----+
only showing top 20 rows

66
0
30


In [24]:
df_s.filter(df_s['Sınıflama'].like('%D%')).orderBy('Sınıflama').show()

+---------+-----+
|Sınıflama|count|
+---------+-----+
|       BD|    2|
|        D|   17|
|       DD|    2|
|       DF|    4|
|       DG|    2|
|       DK|    4|
|       DR|   83|
|       DS|   20|
|       DT|    1|
|       HD|    7|
|       ND|    2|
|       RD|    1|
+---------+-----+



In [26]:
#df.groupBy(df['Sınıflama']).agg(collect_list('Konu Başlıkları').alias('gruplanmış konu başlıkları')).show()
df.groupBy(df['Sınıflama']).agg(collect_list('Konu Başlıkları'), collect_list('Yazar')).show()
#df.filter(df['Ödünç Sayısı'] != 0.0).select("Eser Adı").collect()

+---------+-----------------------------+--------------------+
|Sınıflama|collect_list(Konu Başlıkları)| collect_list(Yazar)|
+---------+-----------------------------+--------------------+
|        K|         [Family--Ethics--...|[Ataseven, Gülsen...|
|       UA|         [North Atlantic T...| [Jordan, Robert S.]|
|       LA|         [Cevdet, Mehmed, ...|[Ergin, Osman Nur...|
|       BS|         [Turkish Literatu...|[Göçmenoğlu, Kası...|
|       NK|         [Hilye-i Şerif--E...|[, Kuşoğlu, Mehme...|
|       AM|         [Museums--Social ...|                  []|
|       PL|         [Turkish Language...|[Yüzendağ, Ahmet,...|
|       PS|         [Rich People--Fic...|[Fitzgerald, Fran...|
|       DR|         [Insurgency--Serb...|[Özkan, Ayşe, , ,...|
|        F|         [California, Sout...|       [Sanger, Kay]|
|       JF|         [Executive Power-...|                  []|
|       BP|         [Islam--Periodica...|[, , , , , , , , ...|
|       NX|         [Piri Reis--Maps-...|              

In [20]:
#df3 = df.groupBy(split(split(df['Sınıflama'], " ")[0], "")[0].alias("group")).count().orderBy("group")
#group_count = df3.count()
#groups = np.empty(group_count, dtype="S30")
#counts = np.empty(group_count)
#for (index,row) in enumerate(df3.collect()):
#    groups[index] = row['group']
#    counts[index] = row['count']
#sns.barplot(x=groups, y=counts)
#plt.xticks(rotation='vertical')
#sns.despine()

In [21]:
#df4 = df.groupBy(df['Dil']).count().orderBy('Dil')
#lang_count = df4.count()
#langs = np.empty(lang_count, dtype="S30")
#counts = np.empty(lang_count)
#for (index,row) in enumerate(df4.collect()):
#    langs[index] = row['Dil']
#    counts[index] = row['count']
#sns.barplot(x=langs, y=counts)
#plt.xticks(rotation='vertical')
#sns.despine()

In [22]:
# Imports
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# First we create a RDD in order to create a dataFrame:
rdd = sc.parallelize([('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])])
df2 = rdd.toDF(['user', 'item', 'fav_items'])
# Print dataFrame
df2.show()

# We make an user define function that receives two columns and do operation
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())

df2.select('user', 'item', 'fav_items', function(col('item'), col('fav_items')).alias('result')).show()

+----+----+---------+
|user|item|fav_items|
+----+----+---------+
|  u1|   1|[1, 2, 3]|
|  u1|   4|[1, 2, 3]|
+----+----+---------+

+----+----+---------+------+
|user|item|fav_items|result|
+----+----+---------+------+
|  u1|   1|[1, 2, 3]|     1|
|  u1|   4|[1, 2, 3]|     0|
+----+----+---------+------+



In [49]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType, StringType

list_all1 = ['a', 'b', 'c', 'd', 'e']

rdd = sc.parallelize([('u1', 'a\nb'), ('u1', 'd\ne\na')])
df1 = rdd.toDF(['user', 'items'])

df1.show()

def function2(item):
    list1 = item.split('\n')
    list1.sort()
    return ' '.join(list1)

df_column_add(df1, 'items', 'result', function2, StringType()).show()

def function3(item): return exist_list(list_all1, item.split('\n'))

#df_column_add(df1, 'items', 'result', function3, ArrayType(IntegerType())).show()


+----+-----+
|user|items|
+----+-----+
|  u1|  a
b|
|  u1|d
e
a|
+----+-----+

+----+-----+------+
|user|items|result|
+----+-----+------+
|  u1|  a
b|   a b|
|  u1|d
e
a| a d e|
+----+-----+------+

