## Survival Analysis

In [1]:
from pyspark.sql.functions import udf,col,split,regexp_extract_all,regexp_extract,explode,size,countDistinct, when, max, min, count
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


### Load dataset

In [2]:
# Create a Spark session
spark = SparkSession.builder.appName("SurvivalDataAnalysis").master("local[*]") \
    .config("spark.driver.memory", "64g") \
    .config("spark.executor.meomory", "64g") \
        .getOrCreate()

    # .config("spark.executor.meomory", "24g") \
    #     .config("spark.driver.maxResultSize", "16g") \


24/01/28 22:10:09 WARN Utils: Your hostname, cssh-alpha resolves to a loopback address: 127.0.1.1; using 134.130.186.164 instead (on interface bond0)
24/01/28 22:10:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/28 22:10:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# spark.stop()

In [4]:
# Specify the path to the folder containing Orc files
input_folder_revision = "/home/cxu/swh-popular-3k-python/revision"
input_folder_snapshot = "/home/cxu/code-survival-analysis/snapshot_revisions_explored"
# input_folder_release = "/home/cxu/swh-popular-3k-python/release"
input_folder_origin_visit_status = "/home/cxu/swh-popular-3k-python/origin_visit_status"
input_folder_origin_visit = "/home/cxu/swh-popular-3k-python/origin_visit"

# Read files into a DataFrame
revision = spark.read.format("orc").load(input_folder_revision)
snapshot = spark.read.format("json").load(input_folder_snapshot)
# release = spark.read.format("orc").load(input_folder_release)
origin_visit_status = spark.read.format("orc").load(input_folder_origin_visit_status)
origin_visit = spark.read.format("orc").load(input_folder_origin_visit)

# Data description
# Print the schema of the dataset
tables = [revision, snapshot, origin_visit_status, origin_visit]
for data in tables:
    data.printSchema()
    # Show the first 5 rows of the dataset
    data.show(1)
    # Get the length of the dataset
    dataset_length = data.count()
    # Show the length of the dataset
    print(f"Length of the dataset: {dataset_length} rows")

                                                                                

root
 |-- id: string (nullable = true)
 |-- message: binary (nullable = true)
 |-- author: binary (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- date_offset: short (nullable = true)
 |-- committer: binary (nullable = true)
 |-- committer_date: timestamp (nullable = true)
 |-- committer_offset: short (nullable = true)
 |-- directory: string (nullable = true)

+--------------------+--------------------+--------------------+-------------------+-----------+--------------------+-------------------+----------------+--------------------+
|                  id|             message|              author|               date|date_offset|           committer|     committer_date|committer_offset|           directory|
+--------------------+--------------------+--------------------+-------------------+-----------+--------------------+-------------------+----------------+--------------------+
|e0b7070276027464e...|[66 69 78 69 6E 6...|[EE 2C E1 74 91 2...|2018-10-23 21:01:57|        120|

                                                                                

Length of the dataset: 76359551 rows
root
 |-- origin: string (nullable = true)
 |-- visit: long (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- status: string (nullable = true)
 |-- snapshot: string (nullable = true)

+--------------------+-----+--------------------+-------+--------------------+
|              origin|visit|                date| status|            snapshot|
+--------------------+-----+--------------------+-------+--------------------+
|deb://Debian/pack...|  287|2017-12-22 16:34:...|partial|48cfa88294968d0de...|
+--------------------+-----+--------------------+-------+--------------------+
only showing top 1 row

Length of the dataset: 784380 rows
root
 |-- origin: string (nullable = true)
 |-- visit: long (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- type: string (nullable = true)

+--------------------+-----+--------------------+----+
|              origin|visit|                date|type|
+--------------------+-----+--------------------+

In [5]:
snapshot = snapshot.dropDuplicates().na.drop()

In [6]:
# snapshot.count()

In [7]:
origin_visit = origin_visit.dropDuplicates().na.drop()
origin_visit_status = origin_visit_status.dropDuplicates().na.drop()
revision = revision.dropDuplicates().na.drop()

In [25]:
temp = revision \
    .join(snapshot, revision["id"] == snapshot["revision"], "inner")\
    .join(origin_visit_status, snapshot["snapshot"] == origin_visit_status["snapshot"], "inner")\
    .join(origin_visit, (origin_visit["origin"] == origin_visit_status["origin"]) & (origin_visit["visit"] == origin_visit_status["visit"]), "inner") \
    .filter((revision.date >= '2005-01-01') & (revision.date <= '2019-12-31'))
    

In [26]:
temp.show()



+--------------------+--------------------+--------------------+-------------------+-----------+--------------------+-------------------+----------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+------+--------------------+--------------------+-----+--------------------+----+
|                  id|             message|              author|               date|date_offset|           committer|     committer_date|committer_offset|           directory|            revision|            snapshot|              origin|visit|                date|status|            snapshot|              origin|visit|                date|type|
+--------------------+--------------------+--------------------+-------------------+-----------+--------------------+-------------------+----------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+------+--------------------+-------------

                                                                                

In [None]:
result = revision \
    .join(snapshot, revision["id"] == snapshot["revision"], "inner")\
    .join(origin_visit_status, snapshot["snapshot"] == origin_visit_status["snapshot"], "inner")\
    .join(origin_visit, (origin_visit["origin"] == origin_visit_status["origin"]) & (origin_visit["visit"] == origin_visit_status["visit"]), "inner") \
    .filter((revision.date >= '2005-01-01') & (revision.date <= '2019-12-31'))\
    .groupBy([origin_visit.origin, origin_visit.type])\
    .agg(
        min(revision.date).alias("start_date"),
        max(revision.date).alias("end_date"),
        countDistinct(revision.author).alias("author_count"),
        countDistinct(revision.date).alias("rev_count")
    )


In [14]:
result.show()



+--------------------+----+-------------------+-------------------+------------+---------+
|              origin|type|         start_date|           end_date|author_count|rev_count|
+--------------------+----+-------------------+-------------------+------------+---------+
|https://github.co...| git|2019-03-11 21:21:26|2019-12-26 22:55:17|         104|     2183|
|https://gitlab.co...| git|2013-01-03 20:44:50|2019-12-24 00:39:24|          19|      512|
|https://gitlab.co...| git|2011-01-28 15:05:17|2018-09-18 20:54:52|          27|      236|
|https://github.co...| git|2017-01-27 05:48:19|2019-07-29 08:42:52|          10|      613|
|https://pypi.org/...|pypi|2010-09-09 10:53:08|2019-10-20 09:15:24|           1|       43|
|https://pypi.org/...|pypi|2013-11-10 18:04:39|2019-11-23 21:17:42|           1|       61|
|https://github.co...| git|2019-04-04 12:27:55|2019-12-29 17:02:23|          59|      608|
|https://github.co...| git|2012-10-17 04:07:16|2019-10-30 13:22:29|         162|      821|

                                                                                

In [12]:
result.count()

                                                                                

2133

In [18]:
result_censored = revision \
    .join(snapshot, revision["id"] == snapshot["revision"])\
    .join(origin_visit_status, snapshot["snapshot"] == origin_visit_status["snapshot"])\
    .join(origin_visit, origin_visit["origin"] == origin_visit_status["origin"]) \
    .withColumn("censored", when(revision["date"] >= '2019-12-31', 1).otherwise(0))\
    .groupBy([origin_visit.origin, origin_visit.type])\
    .agg(
        max("censored").alias("censored")
    )

In [19]:
result_censored.show()



+--------------------+----+--------+
|              origin|type|censored|
+--------------------+----+--------+
|https://github.co...| git|       1|
|https://gitlab.co...| git|       1|
|https://pypi.org/...|pypi|       1|
|https://github.co...| git|       0|
|https://pypi.org/...|pypi|       1|
|https://gitlab.co...| git|       1|
|https://github.co...| git|       1|
|https://github.co...| git|       1|
|https://pypi.org/...|pypi|       1|
|https://github.co...| git|       1|
|https://gitlab.co...| git|       0|
|https://github.co...| git|       0|
|https://pypi.org/...|pypi|       1|
|https://github.co...| git|       1|
|https://github.co...| git|       1|
|https://github.co...| git|       1|
|https://pypi.org/...|pypi|       1|
|https://pypi.org/...|pypi|       0|
|https://pypi.org/...|pypi|       1|
|https://github.co...| git|       1|
+--------------------+----+--------+
only showing top 20 rows



                                                                                

In [21]:
result_censored.toPandas().to_csv('data/sql_data_censored.csv', index=False)

                                                                                

In [15]:
result.toPandas().to_csv('data/sql_data_filtered.csv', index=False)

                                                                                

In [22]:
df = result \
    .join(result_censored, result["origin"] == result_censored["origin"])\
    .select(result.origin, result.type, result.start_date, result.end_date, result.author_count, result.rev_count, result_censored.censored)

In [23]:
df.show()

[Stage 266:>                                                        (0 + 1) / 1]

+--------------------+----+-------------------+-------------------+------------+---------+--------+
|              origin|type|         start_date|           end_date|author_count|rev_count|censored|
+--------------------+----+-------------------+-------------------+------------+---------+--------+
|deb://Debian/pack...| deb|2012-04-18 17:06:21|2019-10-11 00:10:47|           4|        8|       1|
|deb://Debian/pack...| deb|2008-10-19 12:19:40|2019-08-09 18:47:59|           4|        5|       0|
|deb://Debian/pack...| deb|2016-09-05 18:27:20|2019-08-25 14:20:15|           1|        7|       1|
|deb://Debian/pack...| deb|2012-04-18 17:22:57|2019-11-26 19:11:26|           2|       22|       1|
|deb://Debian/pack...| deb|2009-11-14 14:58:53|2019-12-22 22:26:30|           7|       11|       1|
|deb://Debian/pack...| deb|2014-10-19 23:21:11|2019-10-19 21:20:41|           4|       10|       1|
|deb://Debian/pack...| deb|2011-06-02 16:27:14|2019-10-10 20:03:46|           4|        4|       1|


                                                                                

In [24]:
df.toPandas().to_csv('data/sql_data_filtered_censored.csv', index=False)

24/01/28 23:06:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/28 23:06:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/28 23:06:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/28 23:06:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                