# Lung Tumor Analyzer

# What does this project do?

This project aim to predict the localization of the lymph nodes involved in the tumor process. Basically, there is a script that take informations about the patients e send them to a data analysis pipeline that use Logical Linear Regression alghorithms and show the results.

# Why a lung tumor analyzer?

Lung cancer is one of the most common and serious types of cancer. There are usually no signs or symptoms in the early stages of lung cancer. Lung cancer mainly affects older people. It's rare in people younger than 40. More than 4 out of 10 people diagnosed with lung cancer are aged 75 and older.

If the condition is diagnosed early and the cancerous cells are confined to a small area, surgery to remove the affected area of lung may be recommended.

Although people who have never smoked can develop lung cancer, smoking is the most common cause (accounting for about 72% of cases). This is because smoking involves regularly inhaling a number of different toxic substances.

# What results we will have?

In [None]:
The localization of the lymph nodes involved in the tumor process.

-"0.0" => means that the cancer is not thought to have spread to nearby lymph nodes (N0).
-"1.0" => means that the cancer has also spread to lymph nodes within the lung and/or 
          around the area where the bronchus enters the lung (hilar lymph nodes). 
          These lymph nodes are on the same side as the cancer (N1).
-"2.0" => means that the cancer has spread to lymph nodes around the carina (the point where the windpipe splits 
          into the left and right bronchi) or in the space between the lungs (mediastinum). 
          These lymph nodes are on the same side as the main lung tumor (N2).
-"3.0" => means that the cancer has spread to lymph nodes near the collarbone on either side of the body, and/or 
          has spread to hilar or mediastinal lymph nodes on the other side of the body from the main tumor (N3).
-"4.0" => means that nearby lymph nodes cannot be assessed due to lack of information (NX).

source: "https://www.cancer.org/cancer/lung-cancer/detection-diagnosis-staging/staging-nsclc.html#:~:text=than%205%20cm).-,The%20cancer%20has%20spread%20to%20lymph%20nodes%20around%20the%20carina,main%20lung%20tumor%20(N2)."

# The data pipeline

![Pipeline.png](images/Pipeline.png)

# Logstash

For the data ingestion it was used Logstash, it basically give an intarface to Kafka and the Elastic suite. All this can be do easilly through the .conf file.\
It also add a @timestamp field to the json and other field that I preferred remove.

In [None]:
input {
  file{
        path=>"/usr/share/logstash/csv/luad_clinical.csv"
        start_position=>"beginning"
    }
}

filter {
  csv{
      separator => ","
      columns=>["id", "pathology_report_uuid", "year_of_diagnosis", "years_smoked", "pack_years_smoked", "age_at_index",
                "year_of_birth", "year_of_death", "label"]
      remove_field => ["log", "file", "@version", "host", "message", "tags", "event"]
  }
  mutate{
    convert => {
      "id" => integer
      "pathology_report_uuid" => float
      "year_of_diagnosis"=> float
      "years_smoked"=> float
      "pack_years_smoked"=> float
      "age_at_index"=> float
      "year_of_birth"=> float
      "year_of_death" => float
      "label" => float
    }
  }
}

output {
  kafka {
        codec => json
        topic_id => "luad"
        bootstrap_servers => "10.0.100.23:9092"      
    }
}

# Kafka
Apache Kafka is an open-source distributed event streaming platform for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
In this pipeline, the cluster uses a topic to store every message and deal them to the consumers in the right fashion.

# Spark

Apache Spark is an open-source unified analytics engine for large-scale data processing. In this projet it is used to apply a machine learning alghoritm: the Logistic Regression.

In [None]:
assembler=VectorAssembler(inputCols=['pathology_report_uuid','year_of_diagnosis','years_smoked',
                                    'pack_years_smoked','age_at_index','year_of_birth','year_of_death'],outputCol='features')
regression= LogisticRegression(featuresCol= 'features', labelCol='label')
pipeline=Pipeline(stages=[assembler, regression])
pipelineFit= pipeline.fit(trainingset)
updated_trainingset = pipelineFit.transform(trainingset)

# Spark Stream

Spark Stream process the streaming data that come from Kafka and send them to Elastic search. It uses a quite basical code for apply the model to every record.

We start by reading the stream from the server and subscribing it in the topic

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafkaServer) \
  .option("subscribe", topic) \
  .load()

After that we cast the values and we apply a schema. Then we elaborate the stream and write it.

In [None]:
df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", dataKafka).alias("data")) \
    .select("data.*") \
    .writeStream \
    .foreachBatch(elaborate) \
    .start() \
    .awaitTermination()

The elaborate method just apply the model to the record and send it to ElasticSearch. It also show the results on the standard output.

In [None]:
def elaborate(batch_df: DataFrame, batch_id: int):
    batch_df.show(truncate=False)
    if not batch_df.rdd.isEmpty():
        print("******************** \n elaborate\n")
        batch_df.show()                
        data2=pipelineFit.transform(batch_df)
        data2.show()
        data2.summary()

        print("************************ \nSend to ES \n")
        data2.select("id", "@timestamp", "year_of_diagnosis", "years_smoked", "pack_years_smoked", "age_at_index", "year_of_birth", "year_of_death", "prediction") \
        .write \
        .format("org.elasticsearch.spark.sql") \
        .mode('append') \
        .option("es.mapping.id","id") \
        .option("es.nodes", elastic_host).save(elastic_index)

# Elasticsearch

Elasticsearch is a distributed, free and open search and analytics engine for all types of data.

The index is actually matched by the Spark Stream node through the information and the code provided in the script.

After that we can proceed with the visualization on Kibana, i'll show you running everything.
![dashboard.png](images/dashboard.png)

![meme1.png](images/meme1.png) ![meme2.png](images/meme2.png) ![meme3.png](images/meme3.png)