In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = ( SparkSession.builder.appName("Stack Overflow Data Wrangling").config(
"spark.jars","C:/home/timothy/Desktop/Blossom/5/postgresql-42.2.8.jar")
        .getOrCreate()
        )

In [3]:
questions = spark.read.csv('questions2.csv',header=True, inferSchema=True, multiLine = True)
users = spark.read.csv('users2.csv',header=True, inferSchema=True, multiLine = True)
answers = spark.read.csv('answers2.csv',header=True, inferSchema=True, multiLine = True)

In [4]:
questions.columns

['id',
 'user_id',
 'title',
 'body',
 'accepted_answer_id',
 'score',
 'view_count',
 'comment_count',
 'created_at']

In [5]:
users.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at']

In [6]:
answers.columns

['id',
 'user_id',
 'question_id',
 'body',
 'score',
 'comment_count',
 'created_at']

In [7]:
questions = questions.withColumnRenamed('id','questions_id')
users = users.withColumnRenamed('id','users_id')
answers = answers.withColumnRenamed('id','answers_id')

questions = questions.withColumnRenamed('user_id','questions_user_id')
answers = answers.withColumnRenamed('user_id','answers_user_id')

questions = questions.withColumnRenamed('body','questions_body')
answers = answers.withColumnRenamed('body','answers_body')

questions = questions.withColumnRenamed('score','questions_score')
answers = answers.withColumnRenamed('score','answers_score')

questions = questions.withColumnRenamed('comment_count','questions_comment_count')
answers = answers.withColumnRenamed('comment_count','answers_comment_count')

questions = questions.withColumnRenamed('created_at','questions_created_at')
users = users.withColumnRenamed('created_at','users_created_at')
answers = answers.withColumnRenamed('created_at','answers_created_at')

In [8]:
answers.columns

['answers_id',
 'answers_user_id',
 'question_id',
 'answers_body',
 'answers_score',
 'answers_comment_count',
 'answers_created_at']

In [9]:
questions.columns

['questions_id',
 'questions_user_id',
 'title',
 'questions_body',
 'accepted_answer_id',
 'questions_score',
 'view_count',
 'questions_comment_count',
 'questions_created_at']

In [10]:
users.select("location").show(100)

+--------------------+
|            location|
+--------------------+
|Bangalore, Karnat...|
|              Canada|
|Pennsylvania, Uni...|
|                null|
|New Delhi, Delhi,...|
|                null|
|          日本 Tōkyō|
|                null|
|                null|
|Gharaunda, Haryan...|
|                null|
|Passo Fundo, RS, ...|
|                null|
|    New Delhi, India|
|                null|
|      Patras, Greece|
|              Turkey|
|Jalandhar, Punjab...|
|Surat, Gujarat, I...|
|Makassar, Kota Ma...|
|Darjeeling, West ...|
|      Bronx, NY, USA|
|                null|
|Pune, Maharashtra...|
|Mumbai, Maharasht...|
|Bangalore, Karnat...|
|              Serbia|
|                null|
|                null|
| Woodbridge, VA, USA|
|                null|
|             Ukraine|
|Mumbai, Maharasht...|
|                null|
|                null|
|       United States|
|      Luso, Portugal|
| Berlin, Deutschland|
|Bangalore, Karnat...|
|                null|
|            

In [11]:
from pyspark.sql import functions as F

In [12]:
users.count()

272717

In [13]:
users.groupBy('location').count().distinct().sort('count',ascending = False).show(10, truncate = False)

+---------------------------+------+
|location                   |count |
+---------------------------+------+
|null                       |142162|
|India                      |2957  |
|Bangalore, Karnataka, India|2305  |
|Germany                    |2065  |
|Pune, Maharashtra, India   |1445  |
|Chennai, Tamil Nadu, India |1100  |
|France                     |1080  |
|Hyderabad, Telangana, India|1076  |
|Mumbai, Maharashtra, India |998   |
|Paris, France              |995   |
+---------------------------+------+
only showing top 10 rows



In [14]:
#users use the contains function and drop null values.

In [15]:
users = users.na.drop(subset=["location"])
users.groupBy('location').count().distinct().sort('count',ascending = False).show(50, truncate = False)

+---------------------------+-----+
|location                   |count|
+---------------------------+-----+
|India                      |2957 |
|Bangalore, Karnataka, India|2305 |
|Germany                    |2065 |
|Pune, Maharashtra, India   |1445 |
|Chennai, Tamil Nadu, India |1100 |
|France                     |1080 |
|Hyderabad, Telangana, India|1076 |
|Mumbai, Maharashtra, India |998  |
|Paris, France              |995  |
|Ahmedabad, Gujarat, India  |989  |
|Bengaluru, Karnataka, India|935  |
|Israel                     |921  |
|London, UK                 |892  |
|London, United Kingdom     |887  |
|United States              |829  |
|United Kingdom             |779  |
|Netherlands                |753  |
|USA                        |658  |
|Singapore                  |632  |
|Canada                     |630  |
|Delhi, India               |581  |
|Dhaka, Bangladesh          |579  |
|UK                         |577  |
|Berlin, Germany            |572  |
|Poland                     

In [16]:
users.filter(users.location.contains('germany')).count()

7

In [17]:
users.registerTempTable('new_users')
new_users = spark.sql("select * from new_users where location like '%India%' ")

In [18]:
new_users.select("location").show()

+--------------------+
|            location|
+--------------------+
|Bangalore, Karnat...|
|New Delhi, Delhi,...|
|Gharaunda, Haryan...|
|    New Delhi, India|
|Jalandhar, Punjab...|
|Surat, Gujarat, I...|
|Darjeeling, West ...|
|Pune, Maharashtra...|
|Mumbai, Maharasht...|
|Bangalore, Karnat...|
|Mumbai, Maharasht...|
|Bangalore, Karnat...|
|Mumbai, Maharasht...|
|Hyderabad, Telang...|
|Indore, Madhya Pr...|
|               India|
|Bangalore, Karnat...|
|Naya Raipur, Chha...|
|Bangalore, Karnat...|
|Chennai, Tamil Na...|
+--------------------+
only showing top 20 rows



In [19]:
new_users = new_users.withColumn("location", F.regexp_replace('location', r"[,]\s*\w*\s*[,]", ','))

In [20]:
new_users.columns

['users_id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'users_created_at',
 'updated_at']

In [21]:
#new_users = new_users.select('location', F.split(new_users['location'], ',')[0].alias('city'))

new_users_updated = new_users.withColumn('location', F.split(new_users.location, ','))\
.select('users_id', 'display_name', 'reputation', 'website_url', 
                                    'location', 'about_me', 'views', 'up_votes', 
                                    'down_votes', 'image_url', 'users_created_at', 
                                    'updated_at', F.element_at(F.col('location'),-2).alias('city')
        , F.element_at(F.col('location'), -1).alias('country'))

In [22]:
new_users_updated.select('city','country').show(50)

+---------------+-------+
|           city|country|
+---------------+-------+
|      Bangalore|  India|
|      New Delhi|  India|
|      Gharaunda|  India|
|      New Delhi|  India|
|      Jalandhar|  India|
|          Surat|  India|
|    West Bengal|  India|
|           Pune|  India|
|         Mumbai|  India|
|      Bangalore|  India|
|         Mumbai|  India|
|      Bangalore|  India|
|         Mumbai|  India|
|      Hyderabad|  India|
| Madhya Pradesh|  India|
|           null|  India|
|      Bangalore|  India|
|    Naya Raipur|  India|
|      Bangalore|  India|
|     Tamil Nadu|  India|
|      Bangalore|  India|
|           Pune|  India|
|           Pune|  India|
|          Delhi|  India|
|      Bangalore|  India|
|         Mumbai|  India|
|    West Bengal|  India|
|      Bangalore|  India|
|     Tamil Nadu|  India|
|      Bangalore|  India|
|          Surat|  India|
|      Hyderabad|  India|
|      Hyderabad|  India|
|    West Bengal|  India|
|  Uttar Pradesh|  India|
|     Tamil 

In [23]:
joins= new_users_updated.join(questions, new_users_updated.users_id == questions.questions_user_id, 'inner')

In [33]:
joins.count()

464

In [24]:
joins.registerTempTable('new_questions')
new_questions = spark.sql("select * from new_questions where views > 19 ")

In [35]:
new_questions.columns

['users_id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'users_created_at',
 'updated_at',
 'city',
 'country',
 'questions_id',
 'questions_user_id',
 'title',
 'questions_body',
 'accepted_answer_id',
 'questions_score',
 'view_count',
 'questions_comment_count',
 'questions_created_at']

In [36]:
results = new_questions.join(answers, new_questions.questions_created_at == answers.answers_created_at, 'inner')

In [37]:
results.count()

54

In [38]:
results.select('updated_at').show()

+-------------------+
|         updated_at|
+-------------------+
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
|2019-10-11 08:39:32|
+-------------------+
only showing top 20 rows



In [39]:
results.registerTempTable('results_temp')
spark.sql("select min(updated_at) from results_temp").show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-10-11 08:39:32|
+-------------------+



In [28]:
results.dtypes

[('users_id', 'string'),
 ('display_name', 'string'),
 ('reputation', 'string'),
 ('website_url', 'string'),
 ('location', 'array<string>'),
 ('about_me', 'string'),
 ('views', 'string'),
 ('up_votes', 'string'),
 ('down_votes', 'string'),
 ('image_url', 'string'),
 ('users_created_at', 'string'),
 ('updated_at', 'string'),
 ('city', 'string'),
 ('country', 'string'),
 ('questions_id', 'string'),
 ('questions_user_id', 'string'),
 ('title', 'string'),
 ('questions_body', 'string'),
 ('accepted_answer_id', 'string'),
 ('questions_score', 'string'),
 ('view_count', 'string'),
 ('questions_comment_count', 'string'),
 ('questions_created_at', 'string'),
 ('answers_id', 'string'),
 ('answers_user_id', 'string'),
 ('question_id', 'string'),
 ('answers_body', 'string'),
 ('answers_score', 'string'),
 ('answers_comment_count', 'string'),
 ('answers_created_at', 'string')]

In [30]:
type(results)

pyspark.sql.dataframe.DataFrame

In [42]:
tester = results.toPandas()

In [44]:
type(tester)

pandas.core.frame.DataFrame

In [46]:
tester.to_csv('results.csv')

In [45]:
#results.write.format("jdbc").options(
#   url='jdbc:postgresql://localhost:5432/postgres',
#   driver='org.postgresql.Driver',
#   user='postgres',
#   password='postgres',
#   dbtable='stackoverflow_filtered.results'
#).save(mode='append')