 Before you start using this notebook install **com.databricks.spark.xml** Maven module on your Apache Spark and then change **datalake_name** in the Python variable and file system command


In [None]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
datalake_name = 'cadlstoren2ygnppyehpf2'

In [None]:
%fs ls adl://cadlstoren2ygnppyehpf2.azuredatalakestore.net/

In [None]:
%python
configs = {
  'fs.adl.oauth2.access.token.provider.type': 'CustomAccessTokenProvider',
  'fs.adl.oauth2.access.token.custom.provider': spark.conf.get('spark.databricks.passthrough.adls.tokenProviderClassName')
}
dbutils.fs.mount(
source = f'adl://{datalake_name}.azuredatalakestore.net/',
mount_point = '/mnt/datalake',
extra_configs = configs)

In [None]:
path = f'adl://{datalake_name}.azuredatalakestore.net/Posts.xml'
posts_output_path = f'adl://{datalake_name}.azuredatalakestore.net/posts.parquet'

most_popular_questions_with_unique_tag_parquet_path = f'adl://{datalake_name}.azuredatalakestore.net/most_popular_questions_with_unique_tag.parquet'
most_popular_answers_with_unique_tag_parquet_path = f'adl://{datalake_name}.azuredatalakestore.net/most_popular_answers_with_unique_tag.parquet'
most_popular_accepted_answers_with_unique_tag_parquet_path = f'adl://{datalake_name}.azuredatalakestore.net/most_popular_accepted_answers_with_unique_tag.parquet'


lowest_popular_questions_with_unique_tag_parquet_path = f'adl://{datalake_name}.azuredatalakestore.net/lowest_popular_questions_with_unique_tag.parquet'
lowest_popular_answers_with_unique_tag_parquet_path = f'adl://{datalake_name}.azuredatalakestore.net/lowest_popular_answers_with_unique_tag.parquet'
lowest_popular_accepted_answers_with_unique_tag_parquet_path = f'adl://{datalake_name}.azuredatalakestore.net/lowest_popular_accepted_answers_with_unique_tag.parquet'

In [None]:
%sh

sudo apt-get -y install p7zip-full

wget https://archive.org/download/stackexchange/stackoverflow.com-Posts.7z

7za x stackoverflow.com-Posts.7z

ls -al

In [None]:
%python

dbutils.fs.mv('file:/databricks/driver/Posts.xml', f'{path}')

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
schema = StructType() \
            .add('_Id', IntegerType()) \
            .add('_PostTypeId', IntegerType()) \
            .add('_AcceptedAnswerId', IntegerType()) \
            .add('_ParentId', IntegerType()) \
            .add('_CreationDate', TimestampType()) \
            .add('_DeletionDate', TimestampType()) \
            .add('_Score', IntegerType()) \
            .add('_ViewCount', IntegerType()) \
            .add('_Body', StringType()) \
            .add('_OwnerUserId', IntegerType()) \
            .add('_Title', StringType()) \
            .add('_Tags', StringType()) \
            .add('_AnswerCount', IntegerType()) \
            .add('_CommentCount', IntegerType()) \
            .add('_FavoriteCount', IntegerType()) \
            .add('_ClosedDate', TimestampType())

In [None]:
posts_df = spark.read \
      .format('com.databricks.spark.xml') \
      .option('rootTag', 'posts') \
      .option('rowTag', 'row') \
      .load(path, schema = schema)

In [None]:
posts_df.show()

In [None]:
posts_df \
        .withColumnRenamed('_Id', 'Id') \
        .withColumnRenamed('_PostTypeId', 'PostTypeId') \
        .withColumnRenamed('_AcceptedAnswerId', 'AcceptedAnswerId') \
        .withColumnRenamed('_ParentId', 'ParentId') \
        .withColumnRenamed('_CreationDate', 'CreationDate') \
        .withColumnRenamed('_DeletionDate', 'DeletionDate') \
        .withColumnRenamed('_Score', 'Score') \
        .withColumnRenamed('_ViewCount', 'ViewCount') \
        .withColumnRenamed('_Body', 'Body') \
        .withColumnRenamed('_OwnerUserId', 'OwnerUserId') \
        .withColumnRenamed('_Title', 'Title') \
        .withColumnRenamed('_Tags', 'Tags') \
        .withColumnRenamed('_AnswerCount', 'AnswerCount') \
        .withColumnRenamed('_CommentCount', 'CommentCount') \
        .withColumnRenamed('_FavoriteCount', 'FavoriteCount') \
        .withColumnRenamed('_ClosedDate', 'ClosedDate') \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('path', posts_output_path) \
        .save()

In [None]:
posts_based_parquet_df=spark.read.parquet(posts_output_path)
posts_based_parquet_df.show()

In [None]:
posts_based_parquet_df.count()

In [None]:
questions_df = posts_based_parquet_df.filter((col('PostTypeId') == 1) & (col('ClosedDate').isNull()))
answers_df = posts_based_parquet_df.filter((col('PostTypeId') == 2) & (col('ClosedDate').isNull()))

## Most popular technologies

In [None]:
top_10_the_most_popular_technologies_based_on_the_survey_posts = ['<javascript>', '<html>', '<sql>', '<python>', '<typescript>', '<java>', '<c#>', '<bash>', '<php>', '<c++>']

def most_popular_tag_is_in_tags(column_value):
    for tag in top_10_the_most_popular_technologies_based_on_the_survey_posts:
        if tag in column_value:
            return True
    return False

filter_most_popular_tag_is_in_tags = udf(most_popular_tag_is_in_tags, BooleanType())


most_popular_unique_tags_schema = StructType([
    StructField('Tag', StringType())
])
most_popular_unique_tags_data = map(lambda tag: Row(tag), top_10_the_most_popular_technologies_based_on_the_survey_posts)
most_popular_unique_tags_df = spark.createDataFrame(data = most_popular_unique_tags_data, schema = most_popular_unique_tags_schema)

most_popular_questions_with_unique_tag_df = questions_df \
    .filter(filter_most_popular_tag_is_in_tags(questions_df['Tags'])) \
    .crossJoin(most_popular_unique_tags_df) \
    .filter(col('Tags').contains(col('Tag'))) \
    .drop('Tags')


most_popular_questions_with_unique_tag_df \
    .write \
    .mode('overwrite') \
    .format('parquet') \
    .option('path', most_popular_questions_with_unique_tag_parquet_path) \
    .save()

In [None]:
most_popular_answers_with_unique_tag_df = answers_df.drop('Tags').alias('a') \
    .join(most_popular_questions_with_unique_tag_df.alias('q'), col('a._ParentId') == col('q._Id')) \
    .select('a.*', 'q.Tag')

most_popular_answers_with_unique_tag_df \
    .write \
    .mode('overwrite') \
    .format('parquet') \
    .option('path', most_popular_answers_with_unique_tag_parquet_path) \
    .save()

In [None]:

most_popular_accepted_answers_with_unique_tag_df = answers_df.drop('Tags').alias('a') \
    .join(most_popular_questions_with_unique_tag_df.alias('q'), col('a._Id') == col('q._AcceptedAnswerId')) \
    .select('a.*', 'q.Tag')

most_popular_accepted_answers_with_unique_tag_df \
    .write \
    .mode('overwrite') \
    .format('parquet') \
    .option('path', most_popular_accepted_answers_with_unique_tag_parquet_path) \
    .save()

## Least popular technologies

In [None]:
top_10_the_lowest_popular_technologies_based_on_the_survey_posts = ['<crystal-lang>', '<apl>', '<sas>', '<ocaml>', '<cobol>', '<fortran>', '<erlang>', '<julia>', '<f#>', '<lisp>']

def lowest_popular_tag_is_in_tags(column_value):
    for tag in top_10_the_lowest_popular_technologies_based_on_the_survey_posts:
        if tag in column_value:
            return True
    return False

filter_lowest_popular_tag_is_in_tags = udf(lowest_popular_tag_is_in_tags, BooleanType())


lowest_popular_unique_tags_schema = StructType([
    StructField('Tag', StringType())
])
lowest_popular_unique_tags_data = map(lambda tag: Row(tag), top_10_the_lowest_popular_technologies_based_on_the_survey_posts)
lowest_popular_unique_tags_df = spark.createDataFrame(data = lowest_popular_unique_tags_data, schema = lowest_popular_unique_tags_schema)

lowest_popular_questions_with_unique_tag_df = questions_df \
    .filter(filter_lowest_popular_tag_is_in_tags(questions_df['Tags'])) \
    .crossJoin(lowest_popular_unique_tags_df) \
    .filter(col('Tags').contains(col('Tag'))) \
    .drop('Tags')

lowest_popular_questions_with_unique_tag_df \
    .write \
    .mode('overwrite') \
    .format('parquet') \
    .option('path', lowest_popular_questions_with_unique_tag_parquet_path) \
    .save()

In [None]:
lowest_popular_answers_with_unique_tag_df = answers_df.drop('Tags').alias('a') \
    .join(lowest_popular_questions_with_unique_tag_df.alias('q'), col('a._ParentId') == col('q._Id')) \
    .select('a.*', 'q.Tag')

lowest_popular_answers_with_unique_tag_df \
    .write \
    .mode('overwrite') \
    .format('parquet') \
    .option('path', lowest_popular_answers_with_unique_tag_parquet_path) \
    .save()

In [None]:
lowest_popular_accepted_answers_with_unique_tag_df = answers_df.drop('Tags').alias('a') \
    .join(lowest_popular_questions_with_unique_tag_df.alias('q'), col('a._Id') == col('q._AcceptedAnswerId')) \
    .select('a.*', 'q.Tag')

lowest_popular_accepted_answers_with_unique_tag_df \
    .write \
    .mode('overwrite') \
    .format('parquet') \
    .option('path', lowest_popular_accepted_answers_with_unique_tag_parquet_path) \
    .save()